View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.SocketAddress;
23  import java.nio.channels.ClosedSelectorException;
24  import java.nio.channels.spi.SelectorProvider;
25  import java.util.Collections;
26  import java.util.HashMap;
27  import java.util.HashSet;
28  import java.util.Iterator;
29  import java.util.List;
30  import java.util.Map;
31  import java.util.Queue;
32  import java.util.Set;
33  import java.util.concurrent.ConcurrentHashMap;
34  import java.util.concurrent.ConcurrentLinkedQueue;
35  import java.util.concurrent.Executor;
36  import java.util.concurrent.Executors;
37  import java.util.concurrent.Semaphore;
38  import java.util.concurrent.atomic.AtomicReference;
39  
40  import org.apache.mina.core.RuntimeIoException;
41  import org.apache.mina.core.filterchain.IoFilter;
42  import org.apache.mina.core.service.AbstractIoAcceptor;
43  import org.apache.mina.core.service.AbstractIoService;
44  import org.apache.mina.core.service.IoAcceptor;
45  import org.apache.mina.core.service.IoHandler;
46  import org.apache.mina.core.service.IoProcessor;
47  import org.apache.mina.core.service.SimpleIoProcessorPool;
48  import org.apache.mina.core.session.AbstractIoSession;
49  import org.apache.mina.core.session.IoSession;
50  import org.apache.mina.core.session.IoSessionConfig;
51  import org.apache.mina.transport.socket.SocketSessionConfig;
52  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
53  import org.apache.mina.util.ExceptionMonitor;
54  
55  /**
56   * A base class for implementing transport using a polling strategy. The
57   * underlying sockets will be checked in an active loop and woke up when an
58   * socket needed to be processed. This class handle the logic behind binding,
59   * accepting and disposing the server sockets. An {@link Executor} will be used
60   * for running client accepting and an {@link AbstractPollingIoProcessor} will
61   * be used for processing client I/O operations like reading, writing and
62   * closing.
63   * 
64   * All the low level methods for binding, accepting, closing need to be provided
65   * by the subclassing implementation.
66   * 
67   * @see NioSocketAcceptor for a example of implementation
68   * 
69   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
70   */
71  public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> extends AbstractIoAcceptor {
72      /** A lock used to protect the selector to be waked up before it's created */
73      private final Semaphore lock = new Semaphore(1);
74  
75      private final IoProcessor<S> processor;
76  
77      private final boolean createdProcessor;
78  
79      private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
80  
81      private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
82  
83      private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
84  
85      private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
86  
87      /** A flag set when the acceptor has been created and initialized */
88      private volatile boolean selectable;
89  
90      /** The thread responsible of accepting incoming requests */
91      private AtomicReference<Acceptor> acceptorRef = new AtomicReference<Acceptor>();
92  
93      protected boolean reuseAddress = false;
94  
95      /**
96       * Define the number of socket that can wait to be accepted. Default
97       * to 50 (as in the SocketServer default).
98       */
99      protected int backlog = 50;
100 
101     /**
102      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
103      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
104      * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default
105      * pool size will be used.
106      * 
107      * @see SimpleIoProcessorPool
108      * 
109      * @param sessionConfig
110      *            the default configuration for the managed {@link IoSession}
111      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
112      *            type.
113      */
114     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
115         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null);
116     }
117 
118     /**
119      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
120      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
121      * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
122      * systems.
123      * 
124      * @see SimpleIoProcessorPool
125      * 
126      * @param sessionConfig
127      *            the default configuration for the managed {@link IoSession}
128      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
129      *            type.
130      * @param processorCount the amount of processor to instantiate for the pool
131      */
132     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
133             int processorCount) {
134         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true, null);
135     }
136 
137     /**
138      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
139      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
140      * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
141      * systems.
142      *
143      * @see SimpleIoProcessorPool
144      *
145      * @param sessionConfig
146      *            the default configuration for the managed {@link IoSession}
147      * @param processorClass a {@link Class}�of {@link IoProcessor} for the associated {@link IoSession}
148      *            type.
149      * @param processorCount the amount of processor to instantiate for the pool
150      * @param selectorProvider The SelectorProvider to use
151      */
152     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
153             int processorCount, SelectorProvider selectorProvider ) {
154         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount, selectorProvider), true, selectorProvider);
155     }
156 
157     /**
158      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
159      * session configuration, a default {@link Executor} will be created using
160      * {@link Executors#newCachedThreadPool()}.
161      * 
162      * @see AbstractIoService
163      * 
164      * @param sessionConfig
165      *            the default configuration for the managed {@link IoSession}
166      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering
167      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
168      */
169     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<S> processor) {
170         this(sessionConfig, null, processor, false, null);
171     }
172 
173     /**
174      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
175      * default session configuration and an {@link Executor} for handling I/O
176      * events. If a null {@link Executor} is provided, a default one will be
177      * created using {@link Executors#newCachedThreadPool()}.
178      * 
179      * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
180      * 
181      * @param sessionConfig
182      *            the default configuration for the managed {@link IoSession}
183      * @param executor
184      *            the {@link Executor} used for handling asynchronous execution
185      *            of I/O events. Can be <code>null</code>.
186      * @param processor
187      *            the {@link IoProcessor} for processing the {@link IoSession}
188      *            of this transport, triggering events to the bound
189      *            {@link IoHandler} and processing the chains of
190      *            {@link IoFilter}
191      */
192     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) {
193         this(sessionConfig, executor, processor, false, null);
194     }
195 
196     /**
197      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
198      * default session configuration and an {@link Executor} for handling I/O
199      * events. If a null {@link Executor} is provided, a default one will be
200      * created using {@link Executors#newCachedThreadPool()}.
201      * 
202      * @see AbstractIoService(IoSessionConfig, Executor)
203      * 
204      * @param sessionConfig
205      *            the default configuration for the managed {@link IoSession}
206      * @param executor
207      *            the {@link Executor} used for handling asynchronous execution
208      *            of I/O events. Can be <code>null</code>.
209      * @param processor
210      *            the {@link IoProcessor} for processing the {@link IoSession}
211      *            of this transport, triggering events to the bound
212      *            {@link IoHandler} and processing the chains of
213      *            {@link IoFilter}
214      * @param createdProcessor
215      *            tagging the processor as automatically created, so it will be
216      *            automatically disposed
217      */
218     private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
219             boolean createdProcessor, SelectorProvider selectorProvider) {
220         super(sessionConfig, executor);
221 
222         if (processor == null) {
223             throw new IllegalArgumentException("processor");
224         }
225 
226         this.processor = processor;
227         this.createdProcessor = createdProcessor;
228 
229         try {
230             // Initialize the selector
231             init(selectorProvider);
232 
233             // The selector is now ready, we can switch the
234             // flag to true so that incoming connection can be accepted
235             selectable = true;
236         } catch (RuntimeException e) {
237             throw e;
238         } catch (Exception e) {
239             throw new RuntimeIoException("Failed to initialize.", e);
240         } finally {
241             if (!selectable) {
242                 try {
243                     destroy();
244                 } catch (Exception e) {
245                     ExceptionMonitor.getInstance().exceptionCaught(e);
246                 }
247             }
248         }
249     }
250 
251     /**
252      * Initialize the polling system, will be called at construction time.
253      * @throws Exception any exception thrown by the underlying system calls
254      */
255     protected abstract void init() throws Exception;
256 
257     /**
258      * Initialize the polling system, will be called at construction time.
259      * @throws Exception any exception thrown by the underlying system calls
260      */
261     protected abstract void init(SelectorProvider selectorProvider) throws Exception;
262 
263     /**
264      * Destroy the polling system, will be called when this {@link IoAcceptor}
265      * implementation will be disposed.
266      * @throws Exception any exception thrown by the underlying systems calls
267      */
268     protected abstract void destroy() throws Exception;
269 
270     /**
271      * Check for acceptable connections, interrupt when at least a server is ready for accepting.
272      * All the ready server socket descriptors need to be returned by {@link #selectedHandles()}
273      * @return The number of sockets having got incoming client
274      * @throws Exception any exception thrown by the underlying systems calls
275      */
276     protected abstract int select() throws Exception;
277 
278     /**
279      * Interrupt the {@link #select()} method. Used when the poll set need to be modified.
280      */
281     protected abstract void wakeup();
282 
283     /**
284      * {@link Iterator} for the set of server sockets found with acceptable incoming connections
285      *  during the last {@link #select()} call.
286      * @return the list of server handles ready
287      */
288     protected abstract Iterator<H> selectedHandles();
289 
290     /**
291      * Open a server socket for a given local address.
292      * @param localAddress the associated local address
293      * @return the opened server socket
294      * @throws Exception any exception thrown by the underlying systems calls
295      */
296     protected abstract H open(SocketAddress localAddress) throws Exception;
297 
298     /**
299      * Get the local address associated with a given server socket
300      * @param handle the server socket
301      * @return the local {@link SocketAddress} associated with this handle
302      * @throws Exception any exception thrown by the underlying systems calls
303      */
304     protected abstract SocketAddress localAddress(H handle) throws Exception;
305 
306     /**
307      * Accept a client connection for a server socket and return a new {@link IoSession}
308      * associated with the given {@link IoProcessor}
309      * @param processor the {@link IoProcessor} to associate with the {@link IoSession}
310      * @param handle the server handle
311      * @return the created {@link IoSession}
312      * @throws Exception any exception thrown by the underlying systems calls
313      */
314     protected abstract S accept(IoProcessor<S> processor, H handle) throws Exception;
315 
316     /**
317      * Close a server socket.
318      * @param handle the server socket
319      * @throws Exception any exception thrown by the underlying systems calls
320      */
321     protected abstract void close(H handle) throws Exception;
322 
323     /**
324      * {@inheritDoc}
325      */
326     @Override
327     protected void dispose0() throws Exception {
328         unbind();
329 
330         startupAcceptor();
331         wakeup();
332     }
333 
334     /**
335      * {@inheritDoc}
336      */
337     @Override
338     protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
339         // Create a bind request as a Future operation. When the selector
340         // have handled the registration, it will signal this future.
341         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
342 
343         // adds the Registration request to the queue for the Workers
344         // to handle
345         registerQueue.add(request);
346 
347         // creates the Acceptor instance and has the local
348         // executor kick it off.
349         startupAcceptor();
350 
351         // As we just started the acceptor, we have to unblock the select()
352         // in order to process the bind request we just have added to the
353         // registerQueue.
354         try {
355             lock.acquire();
356 
357             // Wait a bit to give a chance to the Acceptor thread to do the select()
358             Thread.sleep(10);
359             wakeup();
360         } finally {
361             lock.release();
362         }
363 
364         // Now, we wait until this request is completed.
365         request.awaitUninterruptibly();
366 
367         if (request.getException() != null) {
368             throw request.getException();
369         }
370 
371         // Update the local addresses.
372         // setLocalAddresses() shouldn't be called from the worker thread
373         // because of deadlock.
374         Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
375 
376         for (H handle : boundHandles.values()) {
377             newLocalAddresses.add(localAddress(handle));
378         }
379 
380         return newLocalAddresses;
381     }
382 
383     /**
384      * This method is called by the doBind() and doUnbind()
385      * methods.  If the acceptor is null, the acceptor object will
386      * be created and kicked off by the executor.  If the acceptor
387      * object is null, probably already created and this class
388      * is now working, then nothing will happen and the method
389      * will just return.
390      */
391     private void startupAcceptor() throws InterruptedException {
392         // If the acceptor is not ready, clear the queues
393         // TODO : they should already be clean : do we have to do that ?
394         if (!selectable) {
395             registerQueue.clear();
396             cancelQueue.clear();
397         }
398 
399         // start the acceptor if not already started
400         Acceptor acceptor = acceptorRef.get();
401 
402         if (acceptor == null) {
403             lock.acquire();
404             acceptor = new Acceptor();
405 
406             if (acceptorRef.compareAndSet(null, acceptor)) {
407                 executeWorker(acceptor);
408             } else {
409                 lock.release();
410             }
411         }
412     }
413 
414     /**
415      * {@inheritDoc}
416      */
417     @Override
418     protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
419         AcceptorOperationFuture future = new AcceptorOperationFuture(localAddresses);
420 
421         cancelQueue.add(future);
422         startupAcceptor();
423         wakeup();
424 
425         future.awaitUninterruptibly();
426         if (future.getException() != null) {
427             throw future.getException();
428         }
429     }
430 
431     /**
432      * This class is called by the startupAcceptor() method and is
433      * placed into a NamePreservingRunnable class.
434      * It's a thread accepting incoming connections from clients.
435      * The loop is stopped when all the bound handlers are unbound.
436      */
437     private class Acceptor implements Runnable {
438         public void run() {
439             assert (acceptorRef.get() == this);
440 
441             int nHandles = 0;
442 
443             // Release the lock
444             lock.release();
445 
446             while (selectable) {
447                 try {
448                     // Detect if we have some keys ready to be processed
449                     // The select() will be woke up if some new connection
450                     // have occurred, or if the selector has been explicitly
451                     // woke up
452                     int selected = select();
453 
454                     // this actually sets the selector to OP_ACCEPT,
455                     // and binds to the port on which this class will
456                     // listen on
457                     nHandles += registerHandles();
458 
459                     // Now, if the number of registred handles is 0, we can
460                     // quit the loop: we don't have any socket listening
461                     // for incoming connection.
462                     if (nHandles == 0) {
463                         acceptorRef.set(null);
464 
465                         if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
466                             assert (acceptorRef.get() != this);
467                             break;
468                         }
469 
470                         if (!acceptorRef.compareAndSet(null, this)) {
471                             assert (acceptorRef.get() != this);
472                             break;
473                         }
474 
475                         assert (acceptorRef.get() == this);
476                     }
477 
478                     if (selected > 0) {
479                         // We have some connection request, let's process
480                         // them here.
481                         processHandles(selectedHandles());
482                     }
483 
484                     // check to see if any cancellation request has been made.
485                     nHandles -= unregisterHandles();
486                 } catch (ClosedSelectorException cse) {
487                     // If the selector has been closed, we can exit the loop
488                     ExceptionMonitor.getInstance().exceptionCaught(cse);
489                     break;
490                 } catch (Exception e) {
491                     ExceptionMonitor.getInstance().exceptionCaught(e);
492 
493                     try {
494                         Thread.sleep(1000);
495                     } catch (InterruptedException e1) {
496                         ExceptionMonitor.getInstance().exceptionCaught(e1);
497                     }
498                 }
499             }
500 
501             // Cleanup all the processors, and shutdown the acceptor.
502             if (selectable && isDisposing()) {
503                 selectable = false;
504                 try {
505                     if (createdProcessor) {
506                         processor.dispose();
507                     }
508                 } finally {
509                     try {
510                         synchronized (disposalLock) {
511                             if (isDisposing()) {
512                                 destroy();
513                             }
514                         }
515                     } catch (Exception e) {
516                         ExceptionMonitor.getInstance().exceptionCaught(e);
517                     } finally {
518                         disposalFuture.setDone();
519                     }
520                 }
521             }
522         }
523 
524         /**
525          * This method will process new sessions for the Worker class.  All
526          * keys that have had their status updates as per the Selector.selectedKeys()
527          * method will be processed here.  Only keys that are ready to accept
528          * connections are handled here.
529          * <p/>
530          * Session objects are created by making new instances of SocketSessionImpl
531          * and passing the session object to the SocketIoProcessor class.
532          */
533         @SuppressWarnings("unchecked")
534         private void processHandles(Iterator<H> handles) throws Exception {
535             while (handles.hasNext()) {
536                 H handle = handles.next();
537                 handles.remove();
538 
539                 // Associates a new created connection to a processor,
540                 // and get back a session
541                 S session = accept(processor, handle);
542 
543                 if (session == null) {
544                     continue;
545                 }
546 
547                 initSession(session, null, null);
548 
549                 // add the session to the SocketIoProcessor
550                 session.getProcessor().add(session);
551             }
552         }
553     }
554 
555     /**
556      * Sets up the socket communications.  Sets items such as:
557      * <p/>
558      * Blocking
559      * Reuse address
560      * Receive buffer size
561      * Bind to listen port
562      * Registers OP_ACCEPT for selector
563      */
564     private int registerHandles() {
565         for (;;) {
566             // The register queue contains the list of services to manage
567             // in this acceptor.
568             AcceptorOperationFuture future = registerQueue.poll();
569 
570             if (future == null) {
571                 return 0;
572             }
573 
574             // We create a temporary map to store the bound handles,
575             // as we may have to remove them all if there is an exception
576             // during the sockets opening.
577             Map<SocketAddress, H> newHandles = new ConcurrentHashMap<SocketAddress, H>();
578             List<SocketAddress> localAddresses = future.getLocalAddresses();
579 
580             try {
581                 // Process all the addresses
582                 for (SocketAddress a : localAddresses) {
583                     H handle = open(a);
584                     newHandles.put(localAddress(handle), handle);
585                 }
586 
587                 // Everything went ok, we can now update the map storing
588                 // all the bound sockets.
589                 boundHandles.putAll(newHandles);
590 
591                 // and notify.
592                 future.setDone();
593                 return newHandles.size();
594             } catch (Exception e) {
595                 // We store the exception in the future
596                 future.setException(e);
597             } finally {
598                 // Roll back if failed to bind all addresses.
599                 if (future.getException() != null) {
600                     for (H handle : newHandles.values()) {
601                         try {
602                             close(handle);
603                         } catch (Exception e) {
604                             ExceptionMonitor.getInstance().exceptionCaught(e);
605                         }
606                     }
607 
608                     // TODO : add some comment : what is the wakeup() waking up ?
609                     wakeup();
610                 }
611             }
612         }
613     }
614 
615     /**
616      * This method just checks to see if anything has been placed into the
617      * cancellation queue.  The only thing that should be in the cancelQueue
618      * is CancellationRequest objects and the only place this happens is in
619      * the doUnbind() method.
620      */
621     private int unregisterHandles() {
622         int cancelledHandles = 0;
623         for (;;) {
624             AcceptorOperationFuture future = cancelQueue.poll();
625             if (future == null) {
626                 break;
627             }
628 
629             // close the channels
630             for (SocketAddress a : future.getLocalAddresses()) {
631                 H handle = boundHandles.remove(a);
632 
633                 if (handle == null) {
634                     continue;
635                 }
636 
637                 try {
638                     close(handle);
639                     wakeup(); // wake up again to trigger thread death
640                 } catch (Exception e) {
641                     ExceptionMonitor.getInstance().exceptionCaught(e);
642                 } finally {
643                     cancelledHandles++;
644                 }
645             }
646 
647             future.setDone();
648         }
649 
650         return cancelledHandles;
651     }
652 
653     /**
654      * {@inheritDoc}
655      */
656     public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
657         throw new UnsupportedOperationException();
658     }
659 
660     /**
661      * @return the backLog
662      */
663     public int getBacklog() {
664         return backlog;
665     }
666 
667     /**
668      * Sets the Backlog parameter
669      * 
670      * @param backlog
671      *            the backlog variable
672      */
673     public void setBacklog(int backlog) {
674         synchronized (bindLock) {
675             if (isActive()) {
676                 throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
677             }
678 
679             this.backlog = backlog;
680         }
681     }
682 
683     /**
684      * @return the flag that sets the reuseAddress information
685      */
686     public boolean isReuseAddress() {
687         return reuseAddress;
688     }
689 
690     /**
691      * Set the Reuse Address flag
692      * 
693      * @param reuseAddress
694      *            The flag to set
695      */
696     public void setReuseAddress(boolean reuseAddress) {
697         synchronized (bindLock) {
698             if (isActive()) {
699                 throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
700             }
701 
702             this.reuseAddress = reuseAddress;
703         }
704     }
705 
706     /**
707      * {@inheritDoc}
708      */
709     public SocketSessionConfig getSessionConfig() {
710         return (SocketSessionConfig)sessionConfig;
711     }
712 }