View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.ConnectException;
23  import java.net.SocketAddress;
24  import java.nio.channels.ClosedSelectorException;
25  import java.util.Iterator;
26  import java.util.Queue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.atomic.AtomicReference;
31  
32  import org.apache.mina.core.RuntimeIoException;
33  import org.apache.mina.core.filterchain.IoFilter;
34  import org.apache.mina.core.future.ConnectFuture;
35  import org.apache.mina.core.future.DefaultConnectFuture;
36  import org.apache.mina.core.service.AbstractIoConnector;
37  import org.apache.mina.core.service.AbstractIoService;
38  import org.apache.mina.core.service.IoConnector;
39  import org.apache.mina.core.service.IoHandler;
40  import org.apache.mina.core.service.IoProcessor;
41  import org.apache.mina.core.service.SimpleIoProcessorPool;
42  import org.apache.mina.core.session.AbstractIoSession;
43  import org.apache.mina.core.session.IoSession;
44  import org.apache.mina.core.session.IoSessionConfig;
45  import org.apache.mina.core.session.IoSessionInitializer;
46  import org.apache.mina.transport.socket.nio.NioSocketConnector;
47  import org.apache.mina.util.ExceptionMonitor;
48  
49  /**
50   * A base class for implementing client transport using a polling strategy. The
51   * underlying sockets will be checked in an active loop and woke up when an
52   * socket needed to be processed. This class handle the logic behind binding,
53   * connecting and disposing the client sockets. A {@link Executor} will be used
54   * for running client connection, and an {@link AbstractPollingIoProcessor} will
55   * be used for processing connected client I/O operations like reading, writing
56   * and closing.
57   * 
58   * All the low level methods for binding, connecting, closing need to be
59   * provided by the subclassing implementation.
60   * 
61   * @see NioSocketConnector for a example of implementation
62   * 
63   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
64   */
65  public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H> extends AbstractIoConnector {
66  
67      private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
68  
69      private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
70  
71      private final IoProcessor<T> processor;
72  
73      private final boolean createdProcessor;
74  
75      private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
76  
77      private volatile boolean selectable;
78  
79      /** The connector thread */
80      private final AtomicReference<Connector> connectorRef = new AtomicReference<Connector>();
81  
82      /**
83       * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
84       * default session configuration, a class of {@link IoProcessor} which will
85       * be instantiated in a {@link SimpleIoProcessorPool} for better scaling in
86       * multiprocessor systems. The default pool size will be used.
87       * 
88       * @see SimpleIoProcessorPool
89       * 
90       * @param sessionConfig
91       *            the default configuration for the managed {@link IoSession}
92       * @param processorClass
93       *            a {@link Class} of {@link IoProcessor} for the associated
94       *            {@link IoSession} type.
95       */
96      protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
97          this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
98      }
99  
100     /**
101      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
102      * default session configuration, a class of {@link IoProcessor} which will
103      * be instantiated in a {@link SimpleIoProcessorPool} for using multiple
104      * thread for better scaling in multiprocessor systems.
105      * 
106      * @see SimpleIoProcessorPool
107      * 
108      * @param sessionConfig
109      *            the default configuration for the managed {@link IoSession}
110      * @param processorClass
111      *            a {@link Class} of {@link IoProcessor} for the associated
112      *            {@link IoSession} type.
113      * @param processorCount
114      *            the amount of processor to instantiate for the pool
115      */
116     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass,
117             int processorCount) {
118         this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
119     }
120 
121     /**
122      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
123      * default session configuration, a default {@link Executor} will be created
124      * using {@link Executors#newCachedThreadPool()}.
125      * 
126      * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
127      * 
128      * @param sessionConfig
129      *            the default configuration for the managed {@link IoSession}
130      * @param processor
131      *            the {@link IoProcessor} for processing the {@link IoSession}
132      *            of this transport, triggering events to the bound
133      *            {@link IoHandler} and processing the chains of
134      *            {@link IoFilter}
135      */
136     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
137         this(sessionConfig, null, processor, false);
138     }
139 
140     /**
141      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
142      * default session configuration and an {@link Executor} for handling I/O
143      * events. If null {@link Executor} is provided, a default one will be
144      * created using {@link Executors#newCachedThreadPool()}.
145      * 
146      * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
147      * 
148      * @param sessionConfig
149      *            the default configuration for the managed {@link IoSession}
150      * @param executor
151      *            the {@link Executor} used for handling asynchronous execution
152      *            of I/O events. Can be <code>null</code>.
153      * @param processor
154      *            the {@link IoProcessor} for processing the {@link IoSession}
155      *            of this transport, triggering events to the bound
156      *            {@link IoHandler} and processing the chains of
157      *            {@link IoFilter}
158      */
159     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor) {
160         this(sessionConfig, executor, processor, false);
161     }
162 
163     /**
164      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
165      * default session configuration and an {@link Executor} for handling I/O
166      * events. If null {@link Executor} is provided, a default one will be
167      * created using {@link Executors#newCachedThreadPool()}.
168      * 
169      * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
170      * 
171      * @param sessionConfig
172      *            the default configuration for the managed {@link IoSession}
173      * @param executor
174      *            the {@link Executor} used for handling asynchronous execution
175      *            of I/O events. Can be <code>null</code>.
176      * @param processor
177      *            the {@link IoProcessor} for processing the {@link IoSession}
178      *            of this transport, triggering events to the bound
179      *            {@link IoHandler} and processing the chains of
180      *            {@link IoFilter}
181      * @param createdProcessor
182      *            tagging the processor as automatically created, so it will be
183      *            automatically disposed
184      */
185     private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor,
186             boolean createdProcessor) {
187         super(sessionConfig, executor);
188 
189         if (processor == null) {
190             throw new IllegalArgumentException("processor");
191         }
192 
193         this.processor = processor;
194         this.createdProcessor = createdProcessor;
195 
196         try {
197             init();
198             selectable = true;
199         } catch (RuntimeException e) {
200             throw e;
201         } catch (Exception e) {
202             throw new RuntimeIoException("Failed to initialize.", e);
203         } finally {
204             if (!selectable) {
205                 try {
206                     destroy();
207                 } catch (Exception e) {
208                     ExceptionMonitor.getInstance().exceptionCaught(e);
209                 }
210             }
211         }
212     }
213 
214     /**
215      * Initialize the polling system, will be called at construction time.
216      * 
217      * @throws Exception
218      *             any exception thrown by the underlying system calls
219      */
220     protected abstract void init() throws Exception;
221 
222     /**
223      * Destroy the polling system, will be called when this {@link IoConnector}
224      * implementation will be disposed.
225      * 
226      * @throws Exception
227      *             any exception thrown by the underlying systems calls
228      */
229     protected abstract void destroy() throws Exception;
230 
231     /**
232      * Create a new client socket handle from a local {@link SocketAddress}
233      * 
234      * @param localAddress
235      *            the socket address for binding the new client socket
236      * @return a new client socket handle
237      * @throws Exception
238      *             any exception thrown by the underlying systems calls
239      */
240     protected abstract H newHandle(SocketAddress localAddress) throws Exception;
241 
242     /**
243      * Connect a newly created client socket handle to a remote
244      * {@link SocketAddress}. This operation is non-blocking, so at end of the
245      * call the socket can be still in connection process.
246      * 
247      * @param handle the client socket handle
248      * @param remoteAddress the remote address where to connect
249      * @return <tt>true</tt> if a connection was established, <tt>false</tt> if
250      *         this client socket is in non-blocking mode and the connection
251      *         operation is in progress
252      * @throws Exception If the connect failed
253      */
254     protected abstract boolean connect(H handle, SocketAddress remoteAddress) throws Exception;
255 
256     /**
257      * Finish the connection process of a client socket after it was marked as
258      * ready to process by the {@link #select(int)} call. The socket will be
259      * connected or reported as connection failed.
260      * 
261      * @param handle
262      *            the client socket handle to finish to connect
263      * @return true if the socket is connected
264      * @throws Exception
265      *             any exception thrown by the underlying systems calls
266      */
267     protected abstract boolean finishConnect(H handle) throws Exception;
268 
269     /**
270      * Create a new {@link IoSession} from a connected socket client handle.
271      * Will assign the created {@link IoSession} to the given
272      * {@link IoProcessor} for managing future I/O events.
273      * 
274      * @param processor
275      *            the processor in charge of this session
276      * @param handle
277      *            the newly connected client socket handle
278      * @return a new {@link IoSession}
279      * @throws Exception
280      *             any exception thrown by the underlying systems calls
281      */
282     protected abstract T newSession(IoProcessor<T> processor, H handle) throws Exception;
283 
284     /**
285      * Close a client socket.
286      * 
287      * @param handle
288      *            the client socket
289      * @throws Exception
290      *             any exception thrown by the underlying systems calls
291      */
292     protected abstract void close(H handle) throws Exception;
293 
294     /**
295      * Interrupt the {@link #select(int)} method. Used when the poll set need to
296      * be modified.
297      */
298     protected abstract void wakeup();
299 
300     /**
301      * Check for connected sockets, interrupt when at least a connection is
302      * processed (connected or failed to connect). All the client socket
303      * descriptors processed need to be returned by {@link #selectedHandles()}
304      * 
305      * @param timeout The timeout for the select() method
306      * @return The number of socket having received some data
307      * @throws Exception any exception thrown by the underlying systems calls
308      */
309     protected abstract int select(int timeout) throws Exception;
310 
311     /**
312      * {@link Iterator} for the set of client sockets found connected or failed
313      * to connect during the last {@link #select(int)} call.
314      * 
315      * @return the list of client socket handles to process
316      */
317     protected abstract Iterator<H> selectedHandles();
318 
319     /**
320      * {@link Iterator} for all the client sockets polled for connection.
321      * 
322      * @return the list of client sockets currently polled for connection
323      */
324     protected abstract Iterator<H> allHandles();
325 
326     /**
327      * Register a new client socket for connection, add it to connection polling
328      * 
329      * @param handle
330      *            client socket handle
331      * @param request
332      *            the associated {@link ConnectionRequest}
333      * @throws Exception
334      *             any exception thrown by the underlying systems calls
335      */
336     protected abstract void register(H handle, ConnectionRequest request) throws Exception;
337 
338     /**
339      * get the {@link ConnectionRequest} for a given client socket handle
340      * 
341      * @param handle
342      *            the socket client handle
343      * @return the connection request if the socket is connecting otherwise
344      *         <code>null</code>
345      */
346     protected abstract ConnectionRequest getConnectionRequest(H handle);
347 
348     /**
349      * {@inheritDoc}
350      */
351     @Override
352     protected final void dispose0() throws Exception {
353         startupWorker();
354         wakeup();
355     }
356 
357     /**
358      * {@inheritDoc}
359      */
360     @Override
361     @SuppressWarnings("unchecked")
362     protected final ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress,
363             IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
364         H handle = null;
365         boolean success = false;
366         try {
367             handle = newHandle(localAddress);
368             if (connect(handle, remoteAddress)) {
369                 ConnectFuture future = new DefaultConnectFuture();
370                 T session = newSession(processor, handle);
371                 initSession(session, future, sessionInitializer);
372                 // Forward the remaining process to the IoProcessor.
373                 session.getProcessor().add(session);
374                 success = true;
375                 return future;
376             }
377 
378             success = true;
379         } catch (Exception e) {
380             return DefaultConnectFuture.newFailedFuture(e);
381         } finally {
382             if (!success && handle != null) {
383                 try {
384                     close(handle);
385                 } catch (Exception e) {
386                     ExceptionMonitor.getInstance().exceptionCaught(e);
387                 }
388             }
389         }
390 
391         ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
392         connectQueue.add(request);
393         startupWorker();
394         wakeup();
395 
396         return request;
397     }
398 
399     private void startupWorker() {
400         if (!selectable) {
401             connectQueue.clear();
402             cancelQueue.clear();
403         }
404 
405         Connector connector = connectorRef.get();
406 
407         if (connector == null) {
408             connector = new Connector();
409 
410             if (connectorRef.compareAndSet(null, connector)) {
411                 executeWorker(connector);
412             }
413         }
414     }
415 
416     private int registerNew() {
417         int nHandles = 0;
418         for (;;) {
419             ConnectionRequest req = connectQueue.poll();
420             if (req == null) {
421                 break;
422             }
423 
424             H handle = req.handle;
425             try {
426                 register(handle, req);
427                 nHandles++;
428             } catch (Exception e) {
429                 req.setException(e);
430                 try {
431                     close(handle);
432                 } catch (Exception e2) {
433                     ExceptionMonitor.getInstance().exceptionCaught(e2);
434                 }
435             }
436         }
437         return nHandles;
438     }
439 
440     private int cancelKeys() {
441         int nHandles = 0;
442 
443         for (;;) {
444             ConnectionRequest req = cancelQueue.poll();
445 
446             if (req == null) {
447                 break;
448             }
449 
450             H handle = req.handle;
451 
452             try {
453                 close(handle);
454             } catch (Exception e) {
455                 ExceptionMonitor.getInstance().exceptionCaught(e);
456             } finally {
457                 nHandles++;
458             }
459         }
460 
461         if (nHandles > 0) {
462             wakeup();
463         }
464 
465         return nHandles;
466     }
467 
468     /**
469      * Process the incoming connections, creating a new session for each valid
470      * connection.
471      */
472     private int processConnections(Iterator<H> handlers) {
473         int nHandles = 0;
474 
475         // Loop on each connection request
476         while (handlers.hasNext()) {
477             H handle = handlers.next();
478             handlers.remove();
479 
480             ConnectionRequest connectionRequest = getConnectionRequest(handle);
481 
482             if (connectionRequest == null) {
483                 continue;
484             }
485 
486             boolean success = false;
487             try {
488                 if (finishConnect(handle)) {
489                     T session = newSession(processor, handle);
490                     initSession(session, connectionRequest, connectionRequest.getSessionInitializer());
491                     // Forward the remaining process to the IoProcessor.
492                     session.getProcessor().add(session);
493                     nHandles++;
494                 }
495                 success = true;
496             } catch (Exception e) {
497                 connectionRequest.setException(e);
498             } finally {
499                 if (!success) {
500                     // The connection failed, we have to cancel it.
501                     cancelQueue.offer(connectionRequest);
502                 }
503             }
504         }
505         return nHandles;
506     }
507 
508     private void processTimedOutSessions(Iterator<H> handles) {
509         long currentTime = System.currentTimeMillis();
510 
511         while (handles.hasNext()) {
512             H handle = handles.next();
513             ConnectionRequest connectionRequest = getConnectionRequest(handle);
514 
515             if ((connectionRequest != null) && (currentTime >= connectionRequest.deadline)) {
516                 connectionRequest.setException(new ConnectException("Connection timed out."));
517                 cancelQueue.offer(connectionRequest);
518             }
519         }
520     }
521 
522     private class Connector implements Runnable {
523 
524         public void run() {
525             assert (connectorRef.get() == this);
526 
527             int nHandles = 0;
528 
529             while (selectable) {
530                 try {
531                     // the timeout for select shall be smaller of the connect
532                     // timeout or 1 second...
533                     int timeout = (int) Math.min(getConnectTimeoutMillis(), 1000L);
534                     int selected = select(timeout);
535 
536                     nHandles += registerNew();
537 
538                     // get a chance to get out of the connector loop, if we
539                     // don't have any more handles
540                     if (nHandles == 0) {
541                         connectorRef.set(null);
542 
543                         if (connectQueue.isEmpty()) {
544                             assert (connectorRef.get() != this);
545                             break;
546                         }
547 
548                         if (!connectorRef.compareAndSet(null, this)) {
549                             assert (connectorRef.get() != this);
550                             break;
551                         }
552 
553                         assert (connectorRef.get() == this);
554                     }
555 
556                     if (selected > 0) {
557                         nHandles -= processConnections(selectedHandles());
558                     }
559 
560                     processTimedOutSessions(allHandles());
561 
562                     nHandles -= cancelKeys();
563                 } catch (ClosedSelectorException cse) {
564                     // If the selector has been closed, we can exit the loop
565                     ExceptionMonitor.getInstance().exceptionCaught(cse);
566                     break;
567                 } catch (Exception e) {
568                     ExceptionMonitor.getInstance().exceptionCaught(e);
569 
570                     try {
571                         Thread.sleep(1000);
572                     } catch (InterruptedException e1) {
573                         ExceptionMonitor.getInstance().exceptionCaught(e1);
574                     }
575                 }
576             }
577 
578             if (selectable && isDisposing()) {
579                 selectable = false;
580                 try {
581                     if (createdProcessor) {
582                         processor.dispose();
583                     }
584                 } finally {
585                     try {
586                         synchronized (disposalLock) {
587                             if (isDisposing()) {
588                                 destroy();
589                             }
590                         }
591                     } catch (Exception e) {
592                         ExceptionMonitor.getInstance().exceptionCaught(e);
593                     } finally {
594                         disposalFuture.setDone();
595                     }
596                 }
597             }
598         }
599     }
600 
601     public final class ConnectionRequest extends DefaultConnectFuture {
602         /** The handle associated with this connection request */
603         private final H handle;
604 
605         /** The time up to this connection request will be valid */
606         private final long deadline;
607 
608         /** The callback to call when the session is initialized */
609         private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;
610 
611         public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) {
612             this.handle = handle;
613             long timeout = getConnectTimeoutMillis();
614 
615             if (timeout <= 0L) {
616                 this.deadline = Long.MAX_VALUE;
617             } else {
618                 this.deadline = System.currentTimeMillis() + timeout;
619             }
620 
621             this.sessionInitializer = callback;
622         }
623 
624         public H getHandle() {
625             return handle;
626         }
627 
628         public long getDeadline() {
629             return deadline;
630         }
631 
632         public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {
633             return sessionInitializer;
634         }
635 
636         @Override
637         public boolean cancel() {
638             if (!isDone()) {
639                 boolean justCancelled = super.cancel();
640 
641                 // We haven't cancelled the request before, so add the future
642                 // in the cancel queue.
643                 if (justCancelled) {
644                     cancelQueue.add(this);
645                     startupWorker();
646                     wakeup();
647                 }
648             }
649 
650             return true;
651         }
652     }
653 }