View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.SocketAddress;
23  import java.nio.channels.ClosedSelectorException;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Queue;
31  import java.util.Set;
32  import java.util.concurrent.ConcurrentHashMap;
33  import java.util.concurrent.ConcurrentLinkedQueue;
34  import java.util.concurrent.Executor;
35  import java.util.concurrent.Executors;
36  import java.util.concurrent.Semaphore;
37  import java.util.concurrent.atomic.AtomicReference;
38  
39  import org.apache.mina.core.RuntimeIoException;
40  import org.apache.mina.core.filterchain.IoFilter;
41  import org.apache.mina.core.service.AbstractIoAcceptor;
42  import org.apache.mina.core.service.IoAcceptor;
43  import org.apache.mina.core.service.IoHandler;
44  import org.apache.mina.core.service.IoProcessor;
45  import org.apache.mina.core.service.SimpleIoProcessorPool;
46  import org.apache.mina.core.session.AbstractIoSession;
47  import org.apache.mina.core.session.IoSession;
48  import org.apache.mina.core.session.IoSessionConfig;
49  import org.apache.mina.transport.socket.SocketSessionConfig;
50  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
51  import org.apache.mina.util.ExceptionMonitor;
52  
53  /**
54   * A base class for implementing transport using a polling strategy. The
55   * underlying sockets will be checked in an active loop and woke up when an
56   * socket needed to be processed. This class handle the logic behind binding,
57   * accepting and disposing the server sockets. An {@link Executor} will be used
58   * for running client accepting and an {@link AbstractPollingIoProcessor} will
59   * be used for processing client I/O operations like reading, writing and
60   * closing.
61   * 
62   * All the low level methods for binding, accepting, closing need to be provided
63   * by the subclassing implementation.
64   * 
65   * @see NioSocketAcceptor for a example of implementation
66   * 
67   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
68   */
69  public abstract class AbstractPollingIoAcceptor<S extends AbstractIoSession, H> extends AbstractIoAcceptor {
70      /** A lock used to protect the selector to be waked up before it's created */
71      private final Semaphore lock = new Semaphore(1);
72  
73      private final IoProcessor<S> processor;
74  
75      private final boolean createdProcessor;
76  
77      private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
78  
79      private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
80  
81      private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
82  
83      private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
84  
85      /** A flag set when the acceptor has been created and initialized */
86      private volatile boolean selectable;
87  
88      /** The thread responsible of accepting incoming requests */
89      private AtomicReference<Acceptor> acceptorRef = new AtomicReference<Acceptor>();
90  
91      protected boolean reuseAddress = false;
92  
93      /**
94       * Define the number of socket that can wait to be accepted. Default
95       * to 50 (as in the SocketServer default).
96       */
97      protected int backlog = 50;
98  
99      /**
100      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
101      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
102      * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default
103      * pool size will be used.
104      * 
105      * @see SimpleIoProcessorPool
106      * 
107      * @param sessionConfig
108      *            the default configuration for the managed {@link IoSession}
109      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
110      *            type.
111      */
112     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
113         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true);
114     }
115 
116     /**
117      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
118      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
119      * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
120      * systems.
121      * 
122      * @see SimpleIoProcessorPool
123      * 
124      * @param sessionConfig
125      *            the default configuration for the managed {@link IoSession}
126      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
127      *            type.
128      * @param processorCount the amount of processor to instantiate for the pool
129      */
130     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass,
131             int processorCount) {
132         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass, processorCount), true);
133     }
134 
135     /**
136      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
137      * session configuration, a default {@link Executor} will be created using
138      * {@link Executors#newCachedThreadPool()}.
139      * 
140      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
141      * 
142      * @param sessionConfig
143      *            the default configuration for the managed {@link IoSession}
144      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering
145      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
146      */
147     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<S> processor) {
148         this(sessionConfig, null, processor, false);
149     }
150 
151     /**
152      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
153      * session configuration and an {@link Executor} for handling I/O events. If a
154      * null {@link Executor} is provided, a default one will be created using
155      * {@link Executors#newCachedThreadPool()}.
156      * 
157      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
158      * 
159      * @param sessionConfig
160      *            the default configuration for the managed {@link IoSession}
161      * @param executor
162      *            the {@link Executor} used for handling asynchronous execution of I/O
163      *            events. Can be <code>null</code>.
164      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering
165      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
166      */
167     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor) {
168         this(sessionConfig, executor, processor, false);
169     }
170 
171     /**
172      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
173      * session configuration and an {@link Executor} for handling I/O events. If a
174      * null {@link Executor} is provided, a default one will be created using
175      * {@link Executors#newCachedThreadPool()}.
176      * 
177      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
178      * 
179      * @param sessionConfig
180      *            the default configuration for the managed {@link IoSession}
181      * @param executor
182      *            the {@link Executor} used for handling asynchronous execution of I/O
183      *            events. Can be <code>null</code>.
184      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of
185      * this transport, triggering events to the bound {@link IoHandler} and processing
186      * the chains of {@link IoFilter}
187      * @param createdProcessor tagging the processor as automatically created, so it
188      * will be automatically disposed
189      */
190     private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
191             boolean createdProcessor) {
192         super(sessionConfig, executor);
193 
194         if (processor == null) {
195             throw new IllegalArgumentException("processor");
196         }
197 
198         this.processor = processor;
199         this.createdProcessor = createdProcessor;
200 
201         try {
202             // Initialize the selector
203             init();
204 
205             // The selector is now ready, we can switch the
206             // flag to true so that incoming connection can be accepted
207             selectable = true;
208         } catch (RuntimeException e) {
209             throw e;
210         } catch (Exception e) {
211             throw new RuntimeIoException("Failed to initialize.", e);
212         } finally {
213             if (!selectable) {
214                 try {
215                     destroy();
216                 } catch (Exception e) {
217                     ExceptionMonitor.getInstance().exceptionCaught(e);
218                 }
219             }
220         }
221     }
222 
223     /**
224      * Initialize the polling system, will be called at construction time.
225      * @throws Exception any exception thrown by the underlying system calls
226      */
227     protected abstract void init() throws Exception;
228 
229     /**
230      * Destroy the polling system, will be called when this {@link IoAcceptor}
231      * implementation will be disposed.
232      * @throws Exception any exception thrown by the underlying systems calls
233      */
234     protected abstract void destroy() throws Exception;
235 
236     /**
237      * Check for acceptable connections, interrupt when at least a server is ready for accepting.
238      * All the ready server socket descriptors need to be returned by {@link #selectedHandles()}
239      * @return The number of sockets having got incoming client
240      * @throws Exception any exception thrown by the underlying systems calls
241      */
242     protected abstract int select() throws Exception;
243 
244     /**
245      * Interrupt the {@link #select()} method. Used when the poll set need to be modified.
246      */
247     protected abstract void wakeup();
248 
249     /**
250      * {@link Iterator} for the set of server sockets found with acceptable incoming connections
251      *  during the last {@link #select()} call.
252      * @return the list of server handles ready
253      */
254     protected abstract Iterator<H> selectedHandles();
255 
256     /**
257      * Open a server socket for a given local address.
258      * @param localAddress the associated local address
259      * @return the opened server socket
260      * @throws Exception any exception thrown by the underlying systems calls
261      */
262     protected abstract H open(SocketAddress localAddress) throws Exception;
263 
264     /**
265      * Get the local address associated with a given server socket
266      * @param handle the server socket
267      * @return the local {@link SocketAddress} associated with this handle
268      * @throws Exception any exception thrown by the underlying systems calls
269      */
270     protected abstract SocketAddress localAddress(H handle) throws Exception;
271 
272     /**
273      * Accept a client connection for a server socket and return a new {@link IoSession}
274      * associated with the given {@link IoProcessor}
275      * @param processor the {@link IoProcessor} to associate with the {@link IoSession}
276      * @param handle the server handle
277      * @return the created {@link IoSession}
278      * @throws Exception any exception thrown by the underlying systems calls
279      */
280     protected abstract S accept(IoProcessor<S> processor, H handle) throws Exception;
281 
282     /**
283      * Close a server socket.
284      * @param handle the server socket
285      * @throws Exception any exception thrown by the underlying systems calls
286      */
287     protected abstract void close(H handle) throws Exception;
288 
289     /**
290      * {@inheritDoc}
291      */
292     @Override
293     protected void dispose0() throws Exception {
294         unbind();
295 
296         startupAcceptor();
297         wakeup();
298     }
299 
300     /**
301      * {@inheritDoc}
302      */
303     @Override
304     protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
305         // Create a bind request as a Future operation. When the selector
306         // have handled the registration, it will signal this future.
307         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
308 
309         // adds the Registration request to the queue for the Workers
310         // to handle
311         registerQueue.add(request);
312 
313         // creates the Acceptor instance and has the local
314         // executor kick it off.
315         startupAcceptor();
316 
317         // As we just started the acceptor, we have to unblock the select()
318         // in order to process the bind request we just have added to the
319         // registerQueue.
320         try {
321             lock.acquire();
322 
323             // Wait a bit to give a chance to the Acceptor thread to do the select()
324             Thread.sleep(10);
325             wakeup();
326         } finally {
327             lock.release();
328         }
329 
330         // Now, we wait until this request is completed.
331         request.awaitUninterruptibly();
332 
333         if (request.getException() != null) {
334             throw request.getException();
335         }
336 
337         // Update the local addresses.
338         // setLocalAddresses() shouldn't be called from the worker thread
339         // because of deadlock.
340         Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
341 
342         for (H handle : boundHandles.values()) {
343             newLocalAddresses.add(localAddress(handle));
344         }
345 
346         return newLocalAddresses;
347     }
348 
349     /**
350      * This method is called by the doBind() and doUnbind()
351      * methods.  If the acceptor is null, the acceptor object will
352      * be created and kicked off by the executor.  If the acceptor
353      * object is null, probably already created and this class
354      * is now working, then nothing will happen and the method
355      * will just return.
356      */
357     private void startupAcceptor() throws InterruptedException {
358         // If the acceptor is not ready, clear the queues
359         // TODO : they should already be clean : do we have to do that ?
360         if (!selectable) {
361             registerQueue.clear();
362             cancelQueue.clear();
363         }
364 
365         // start the acceptor if not already started
366         Acceptor acceptor = acceptorRef.get();
367 
368         if (acceptor == null) {
369             lock.acquire();
370             acceptor = new Acceptor();
371 
372             if (acceptorRef.compareAndSet(null, acceptor)) {
373                 executeWorker(acceptor);
374             } else {
375                 lock.release();
376             }
377         }
378     }
379 
380     /**
381      * {@inheritDoc}
382      */
383     @Override
384     protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
385         AcceptorOperationFuture future = new AcceptorOperationFuture(localAddresses);
386 
387         cancelQueue.add(future);
388         startupAcceptor();
389         wakeup();
390 
391         future.awaitUninterruptibly();
392         if (future.getException() != null) {
393             throw future.getException();
394         }
395     }
396 
397     /**
398      * This class is called by the startupAcceptor() method and is
399      * placed into a NamePreservingRunnable class.
400      * It's a thread accepting incoming connections from clients.
401      * The loop is stopped when all the bound handlers are unbound.
402      */
403     private class Acceptor implements Runnable {
404         public void run() {
405             assert (acceptorRef.get() == this);
406 
407             int nHandles = 0;
408 
409             // Release the lock
410             lock.release();
411 
412             while (selectable) {
413                 try {
414                     // Detect if we have some keys ready to be processed
415                     // The select() will be woke up if some new connection
416                     // have occurred, or if the selector has been explicitly
417                     // woke up
418                     int selected = select();
419 
420                     // this actually sets the selector to OP_ACCEPT,
421                     // and binds to the port on which this class will
422                     // listen on
423                     nHandles += registerHandles();
424 
425                     // Now, if the number of registred handles is 0, we can
426                     // quit the loop: we don't have any socket listening
427                     // for incoming connection.
428                     if (nHandles == 0) {
429                         acceptorRef.set(null);
430 
431                         if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
432                             assert (acceptorRef.get() != this);
433                             break;
434                         }
435 
436                         if (!acceptorRef.compareAndSet(null, this)) {
437                             assert (acceptorRef.get() != this);
438                             break;
439                         }
440 
441                         assert (acceptorRef.get() == this);
442                     }
443 
444                     if (selected > 0) {
445                         // We have some connection request, let's process
446                         // them here.
447                         processHandles(selectedHandles());
448                     }
449 
450                     // check to see if any cancellation request has been made.
451                     nHandles -= unregisterHandles();
452                 } catch (ClosedSelectorException cse) {
453                     // If the selector has been closed, we can exit the loop
454                     break;
455                 } catch (Throwable e) {
456                     ExceptionMonitor.getInstance().exceptionCaught(e);
457 
458                     try {
459                         Thread.sleep(1000);
460                     } catch (InterruptedException e1) {
461                         ExceptionMonitor.getInstance().exceptionCaught(e1);
462                     }
463                 }
464             }
465 
466             // Cleanup all the processors, and shutdown the acceptor.
467             if (selectable && isDisposing()) {
468                 selectable = false;
469                 try {
470                     if (createdProcessor) {
471                         processor.dispose();
472                     }
473                 } finally {
474                     try {
475                         synchronized (disposalLock) {
476                             if (isDisposing()) {
477                                 destroy();
478                             }
479                         }
480                     } catch (Exception e) {
481                         ExceptionMonitor.getInstance().exceptionCaught(e);
482                     } finally {
483                         disposalFuture.setDone();
484                     }
485                 }
486             }
487         }
488 
489         /**
490          * This method will process new sessions for the Worker class.  All
491          * keys that have had their status updates as per the Selector.selectedKeys()
492          * method will be processed here.  Only keys that are ready to accept
493          * connections are handled here.
494          * <p/>
495          * Session objects are created by making new instances of SocketSessionImpl
496          * and passing the session object to the SocketIoProcessor class.
497          */
498         @SuppressWarnings("unchecked")
499         private void processHandles(Iterator<H> handles) throws Exception {
500             while (handles.hasNext()) {
501                 H handle = handles.next();
502                 handles.remove();
503 
504                 // Associates a new created connection to a processor,
505                 // and get back a session
506                 S session = accept(processor, handle);
507 
508                 if (session == null) {
509                     continue;
510                 }
511 
512                 initSession(session, null, null);
513 
514                 // add the session to the SocketIoProcessor
515                 session.getProcessor().add(session);
516             }
517         }
518     }
519 
520     /**
521      * Sets up the socket communications.  Sets items such as:
522      * <p/>
523      * Blocking
524      * Reuse address
525      * Receive buffer size
526      * Bind to listen port
527      * Registers OP_ACCEPT for selector
528      */
529     private int registerHandles() {
530         for (;;) {
531             // The register queue contains the list of services to manage
532             // in this acceptor.
533             AcceptorOperationFuture future = registerQueue.poll();
534 
535             if (future == null) {
536                 return 0;
537             }
538 
539             // We create a temporary map to store the bound handles,
540             // as we may have to remove them all if there is an exception
541             // during the sockets opening.
542             Map<SocketAddress, H> newHandles = new ConcurrentHashMap<SocketAddress, H>();
543             List<SocketAddress> localAddresses = future.getLocalAddresses();
544 
545             try {
546                 // Process all the addresses
547                 for (SocketAddress a : localAddresses) {
548                     H handle = open(a);
549                     newHandles.put(localAddress(handle), handle);
550                 }
551 
552                 // Everything went ok, we can now update the map storing
553                 // all the bound sockets.
554                 boundHandles.putAll(newHandles);
555 
556                 // and notify.
557                 future.setDone();
558                 return newHandles.size();
559             } catch (Exception e) {
560                 // We store the exception in the future
561                 future.setException(e);
562             } finally {
563                 // Roll back if failed to bind all addresses.
564                 if (future.getException() != null) {
565                     for (H handle : newHandles.values()) {
566                         try {
567                             close(handle);
568                         } catch (Exception e) {
569                             ExceptionMonitor.getInstance().exceptionCaught(e);
570                         }
571                     }
572 
573                     // TODO : add some comment : what is the wakeup() waking up ?
574                     wakeup();
575                 }
576             }
577         }
578     }
579 
580     /**
581      * This method just checks to see if anything has been placed into the
582      * cancellation queue.  The only thing that should be in the cancelQueue
583      * is CancellationRequest objects and the only place this happens is in
584      * the doUnbind() method.
585      */
586     private int unregisterHandles() {
587         int cancelledHandles = 0;
588         for (;;) {
589             AcceptorOperationFuture future = cancelQueue.poll();
590             if (future == null) {
591                 break;
592             }
593 
594             // close the channels
595             for (SocketAddress a : future.getLocalAddresses()) {
596                 H handle = boundHandles.remove(a);
597 
598                 if (handle == null) {
599                     continue;
600                 }
601 
602                 try {
603                     close(handle);
604                     wakeup(); // wake up again to trigger thread death
605                 } catch (Throwable e) {
606                     ExceptionMonitor.getInstance().exceptionCaught(e);
607                 } finally {
608                     cancelledHandles++;
609                 }
610             }
611 
612             future.setDone();
613         }
614 
615         return cancelledHandles;
616     }
617 
618     /**
619      * {@inheritDoc}
620      */
621     public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
622         throw new UnsupportedOperationException();
623     }
624 
625     /**
626      * {@inheritDoc}
627      */
628     public int getBacklog() {
629         return backlog;
630     }
631 
632     /**
633      * {@inheritDoc}
634      */
635     public void setBacklog(int backlog) {
636         synchronized (bindLock) {
637             if (isActive()) {
638                 throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
639             }
640 
641             this.backlog = backlog;
642         }
643     }
644 
645     /**
646      * {@inheritDoc}
647      */
648     public boolean isReuseAddress() {
649         return reuseAddress;
650     }
651 
652     /**
653      * {@inheritDoc}
654      */
655     public void setReuseAddress(boolean reuseAddress) {
656         synchronized (bindLock) {
657             if (isActive()) {
658                 throw new IllegalStateException("backlog can't be set while the acceptor is bound.");
659             }
660 
661             this.reuseAddress = reuseAddress;
662         }
663     }
664     
665     /**
666      * {@inheritDoc}
667      */
668     public SocketSessionConfig getSessionConfig() {
669         return (SocketSessionConfig)sessionConfig;
670     }
671 }