View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.ConnectException;
23  import java.net.SocketAddress;
24  import java.nio.channels.ClosedSelectorException;
25  import java.util.Iterator;
26  import java.util.Queue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.atomic.AtomicReference;
31  
32  import org.apache.mina.core.RuntimeIoException;
33  import org.apache.mina.core.filterchain.IoFilter;
34  import org.apache.mina.core.future.ConnectFuture;
35  import org.apache.mina.core.future.DefaultConnectFuture;
36  import org.apache.mina.core.service.AbstractIoConnector;
37  import org.apache.mina.core.service.AbstractIoService;
38  import org.apache.mina.core.service.IoConnector;
39  import org.apache.mina.core.service.IoHandler;
40  import org.apache.mina.core.service.IoProcessor;
41  import org.apache.mina.core.service.SimpleIoProcessorPool;
42  import org.apache.mina.core.session.AbstractIoSession;
43  import org.apache.mina.core.session.IoSession;
44  import org.apache.mina.core.session.IoSessionConfig;
45  import org.apache.mina.core.session.IoSessionInitializer;
46  import org.apache.mina.transport.socket.nio.NioSocketConnector;
47  import org.apache.mina.util.ExceptionMonitor;
48  
49  /**
50   * A base class for implementing client transport using a polling strategy. The
51   * underlying sockets will be checked in an active loop and woke up when an
52   * socket needed to be processed. This class handle the logic behind binding,
53   * connecting and disposing the client sockets. A {@link Executor} will be used
54   * for running client connection, and an {@link AbstractPollingIoProcessor} will
55   * be used for processing connected client I/O operations like reading, writing
56   * and closing.
57   * 
58   * All the low level methods for binding, connecting, closing need to be
59   * provided by the subclassing implementation.
60   * 
61   * @see NioSocketConnector for a example of implementation
62   * 
63   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
64   */
65  public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H> extends AbstractIoConnector {
66  
67      private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
68  
69      private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
70  
71      private final IoProcessor<T> processor;
72  
73      private final boolean createdProcessor;
74  
75      private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
76  
77      private volatile boolean selectable;
78  
79      /** The connector thread */
80      private final AtomicReference<Connector> connectorRef = new AtomicReference<Connector>();
81  
82      /**
83       * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
84       * default session configuration, a class of {@link IoProcessor} which will
85       * be instantiated in a {@link SimpleIoProcessorPool} for better scaling in
86       * multiprocessor systems. The default pool size will be used.
87       * 
88       * @see SimpleIoProcessorPool
89       * 
90       * @param sessionConfig
91       *            the default configuration for the managed {@link IoSession}
92       * @param processorClass
93       *            a {@link Class} of {@link IoProcessor} for the associated
94       *            {@link IoSession} type.
95       */
96      protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
97          this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
98      }
99  
100     /**
101      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
102      * default session configuration, a class of {@link IoProcessor} which will
103      * be instantiated in a {@link SimpleIoProcessorPool} for using multiple
104      * thread for better scaling in multiprocessor systems.
105      * 
106      * @see SimpleIoProcessorPool
107      * 
108      * @param sessionConfig
109      *            the default configuration for the managed {@link IoSession}
110      * @param processorClass
111      *            a {@link Class} of {@link IoProcessor} for the associated
112      *            {@link IoSession} type.
113      * @param processorCount
114      *            the amount of processor to instantiate for the pool
115      */
116     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass,
117             int processorCount) {
118         this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
119     }
120 
121     /**
122      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
123      * default session configuration, a default {@link Executor} will be created
124      * using {@link Executors#newCachedThreadPool()}.
125      * 
126      * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
127      * 
128      * @param sessionConfig
129      *            the default configuration for the managed {@link IoSession}
130      * @param processor
131      *            the {@link IoProcessor} for processing the {@link IoSession}
132      *            of this transport, triggering events to the bound
133      *            {@link IoHandler} and processing the chains of
134      *            {@link IoFilter}
135      */
136     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
137         this(sessionConfig, null, processor, false);
138     }
139 
140     /**
141      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
142      * default session configuration and an {@link Executor} for handling I/O
143      * events. If null {@link Executor} is provided, a default one will be
144      * created using {@link Executors#newCachedThreadPool()}.
145      * 
146      * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
147      * 
148      * @param sessionConfig
149      *            the default configuration for the managed {@link IoSession}
150      * @param executor
151      *            the {@link Executor} used for handling asynchronous execution
152      *            of I/O events. Can be <code>null</code>.
153      * @param processor
154      *            the {@link IoProcessor} for processing the {@link IoSession}
155      *            of this transport, triggering events to the bound
156      *            {@link IoHandler} and processing the chains of
157      *            {@link IoFilter}
158      */
159     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor) {
160         this(sessionConfig, executor, processor, false);
161     }
162 
163     /**
164      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
165      * default session configuration and an {@link Executor} for handling I/O
166      * events. If null {@link Executor} is provided, a default one will be
167      * created using {@link Executors#newCachedThreadPool()}.
168      * 
169      * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
170      * 
171      * @param sessionConfig
172      *            the default configuration for the managed {@link IoSession}
173      * @param executor
174      *            the {@link Executor} used for handling asynchronous execution
175      *            of I/O events. Can be <code>null</code>.
176      * @param processor
177      *            the {@link IoProcessor} for processing the {@link IoSession}
178      *            of this transport, triggering events to the bound
179      *            {@link IoHandler} and processing the chains of
180      *            {@link IoFilter}
181      * @param createdProcessor
182      *            tagging the processor as automatically created, so it will be
183      *            automatically disposed
184      */
185     private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor,
186             boolean createdProcessor) {
187         super(sessionConfig, executor);
188 
189         if (processor == null) {
190             throw new IllegalArgumentException("processor");
191         }
192 
193         this.processor = processor;
194         this.createdProcessor = createdProcessor;
195 
196         try {
197             init();
198             selectable = true;
199         } catch (RuntimeException e) {
200             throw e;
201         } catch (Exception e) {
202             throw new RuntimeIoException("Failed to initialize.", e);
203         } finally {
204             if (!selectable) {
205                 try {
206                     destroy();
207                 } catch (Exception e) {
208                     ExceptionMonitor.getInstance().exceptionCaught(e);
209                 }
210             }
211         }
212     }
213 
214     /**
215      * Initialize the polling system, will be called at construction time.
216      * 
217      * @throws Exception
218      *             any exception thrown by the underlying system calls
219      */
220     protected abstract void init() throws Exception;
221 
222     /**
223      * Destroy the polling system, will be called when this {@link IoConnector}
224      * implementation will be disposed.
225      * 
226      * @throws Exception
227      *             any exception thrown by the underlying systems calls
228      */
229     protected abstract void destroy() throws Exception;
230 
231     /**
232      * Create a new client socket handle from a local {@link SocketAddress}
233      * 
234      * @param localAddress
235      *            the socket address for binding the new client socket
236      * @return a new client socket handle
237      * @throws Exception
238      *             any exception thrown by the underlying systems calls
239      */
240     protected abstract H newHandle(SocketAddress localAddress) throws Exception;
241 
242     /**
243      * Connect a newly created client socket handle to a remote
244      * {@link SocketAddress}. This operation is non-blocking, so at end of the
245      * call the socket can be still in connection process.
246      * 
247      * @param handle
248      *            the client socket handle
249      * @param remoteAddress
250      *            the remote address where to connect
251      * @return <tt>true</tt> if a connection was established, <tt>false</tt> if
252      *         this client socket is in non-blocking mode and the connection
253      *         operation is in progress
254      * @throws Exception
255      */
256     protected abstract boolean connect(H handle, SocketAddress remoteAddress) throws Exception;
257 
258     /**
259      * Finish the connection process of a client socket after it was marked as
260      * ready to process by the {@link #select(int)} call. The socket will be
261      * connected or reported as connection failed.
262      * 
263      * @param handle
264      *            the client socket handle to finish to connect
265      * @return true if the socket is connected
266      * @throws Exception
267      *             any exception thrown by the underlying systems calls
268      */
269     protected abstract boolean finishConnect(H handle) throws Exception;
270 
271     /**
272      * Create a new {@link IoSession} from a connected socket client handle.
273      * Will assign the created {@link IoSession} to the given
274      * {@link IoProcessor} for managing future I/O events.
275      * 
276      * @param processor
277      *            the processor in charge of this session
278      * @param handle
279      *            the newly connected client socket handle
280      * @return a new {@link IoSession}
281      * @throws Exception
282      *             any exception thrown by the underlying systems calls
283      */
284     protected abstract T newSession(IoProcessor<T> processor, H handle) throws Exception;
285 
286     /**
287      * Close a client socket.
288      * 
289      * @param handle
290      *            the client socket
291      * @throws Exception
292      *             any exception thrown by the underlying systems calls
293      */
294     protected abstract void close(H handle) throws Exception;
295 
296     /**
297      * Interrupt the {@link #select(int)} method. Used when the poll set need to
298      * be modified.
299      */
300     protected abstract void wakeup();
301 
302     /**
303      * Check for connected sockets, interrupt when at least a connection is
304      * processed (connected or failed to connect). All the client socket
305      * descriptors processed need to be returned by {@link #selectedHandles()}
306      * 
307      * @return The number of socket having received some data
308      * @throws Exception
309      *             any exception thrown by the underlying systems calls
310      */
311     protected abstract int select(int timeout) throws Exception;
312 
313     /**
314      * {@link Iterator} for the set of client sockets found connected or failed
315      * to connect during the last {@link #select(int)} call.
316      * 
317      * @return the list of client socket handles to process
318      */
319     protected abstract Iterator<H> selectedHandles();
320 
321     /**
322      * {@link Iterator} for all the client sockets polled for connection.
323      * 
324      * @return the list of client sockets currently polled for connection
325      */
326     protected abstract Iterator<H> allHandles();
327 
328     /**
329      * Register a new client socket for connection, add it to connection polling
330      * 
331      * @param handle
332      *            client socket handle
333      * @param request
334      *            the associated {@link ConnectionRequest}
335      * @throws Exception
336      *             any exception thrown by the underlying systems calls
337      */
338     protected abstract void register(H handle, ConnectionRequest request) throws Exception;
339 
340     /**
341      * get the {@link ConnectionRequest} for a given client socket handle
342      * 
343      * @param handle
344      *            the socket client handle
345      * @return the connection request if the socket is connecting otherwise
346      *         <code>null</code>
347      */
348     protected abstract ConnectionRequest getConnectionRequest(H handle);
349 
350     /**
351      * {@inheritDoc}
352      */
353     @Override
354     protected final void dispose0() throws Exception {
355         startupWorker();
356         wakeup();
357     }
358 
359     /**
360      * {@inheritDoc}
361      */
362     @Override
363     @SuppressWarnings("unchecked")
364     protected final ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress,
365             IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
366         H handle = null;
367         boolean success = false;
368         try {
369             handle = newHandle(localAddress);
370             if (connect(handle, remoteAddress)) {
371                 ConnectFuture future = new DefaultConnectFuture();
372                 T session = newSession(processor, handle);
373                 initSession(session, future, sessionInitializer);
374                 // Forward the remaining process to the IoProcessor.
375                 session.getProcessor().add(session);
376                 success = true;
377                 return future;
378             }
379 
380             success = true;
381         } catch (Exception e) {
382             return DefaultConnectFuture.newFailedFuture(e);
383         } finally {
384             if (!success && handle != null) {
385                 try {
386                     close(handle);
387                 } catch (Exception e) {
388                     ExceptionMonitor.getInstance().exceptionCaught(e);
389                 }
390             }
391         }
392 
393         ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
394         connectQueue.add(request);
395         startupWorker();
396         wakeup();
397 
398         return request;
399     }
400 
401     private void startupWorker() {
402         if (!selectable) {
403             connectQueue.clear();
404             cancelQueue.clear();
405         }
406 
407         Connector connector = connectorRef.get();
408 
409         if (connector == null) {
410             connector = new Connector();
411 
412             if (connectorRef.compareAndSet(null, connector)) {
413                 executeWorker(connector);
414             }
415         }
416     }
417 
418     private int registerNew() {
419         int nHandles = 0;
420         for (;;) {
421             ConnectionRequest req = connectQueue.poll();
422             if (req == null) {
423                 break;
424             }
425 
426             H handle = req.handle;
427             try {
428                 register(handle, req);
429                 nHandles++;
430             } catch (Exception e) {
431                 req.setException(e);
432                 try {
433                     close(handle);
434                 } catch (Exception e2) {
435                     ExceptionMonitor.getInstance().exceptionCaught(e2);
436                 }
437             }
438         }
439         return nHandles;
440     }
441 
442     private int cancelKeys() {
443         int nHandles = 0;
444 
445         for (;;) {
446             ConnectionRequest req = cancelQueue.poll();
447 
448             if (req == null) {
449                 break;
450             }
451 
452             H handle = req.handle;
453 
454             try {
455                 close(handle);
456             } catch (Exception e) {
457                 ExceptionMonitor.getInstance().exceptionCaught(e);
458             } finally {
459                 nHandles++;
460             }
461         }
462 
463         if (nHandles > 0) {
464             wakeup();
465         }
466 
467         return nHandles;
468     }
469 
470     /**
471      * Process the incoming connections, creating a new session for each valid
472      * connection.
473      */
474     private int processConnections(Iterator<H> handlers) {
475         int nHandles = 0;
476 
477         // Loop on each connection request
478         while (handlers.hasNext()) {
479             H handle = handlers.next();
480             handlers.remove();
481 
482             ConnectionRequest connectionRequest = getConnectionRequest(handle);
483 
484             if (connectionRequest == null) {
485                 continue;
486             }
487 
488             boolean success = false;
489             try {
490                 if (finishConnect(handle)) {
491                     T session = newSession(processor, handle);
492                     initSession(session, connectionRequest, connectionRequest.getSessionInitializer());
493                     // Forward the remaining process to the IoProcessor.
494                     session.getProcessor().add(session);
495                     nHandles++;
496                 }
497                 success = true;
498             } catch (Exception e) {
499                 connectionRequest.setException(e);
500             } finally {
501                 if (!success) {
502                     // The connection failed, we have to cancel it.
503                     cancelQueue.offer(connectionRequest);
504                 }
505             }
506         }
507         return nHandles;
508     }
509 
510     private void processTimedOutSessions(Iterator<H> handles) {
511         long currentTime = System.currentTimeMillis();
512 
513         while (handles.hasNext()) {
514             H handle = handles.next();
515             ConnectionRequest connectionRequest = getConnectionRequest(handle);
516 
517             if ((connectionRequest != null) && (currentTime >= connectionRequest.deadline)) {
518                 connectionRequest.setException(new ConnectException("Connection timed out."));
519                 cancelQueue.offer(connectionRequest);
520             }
521         }
522     }
523 
524     private class Connector implements Runnable {
525 
526         public void run() {
527             assert (connectorRef.get() == this);
528 
529             int nHandles = 0;
530 
531             while (selectable) {
532                 try {
533                     // the timeout for select shall be smaller of the connect
534                     // timeout or 1 second...
535                     int timeout = (int) Math.min(getConnectTimeoutMillis(), 1000L);
536                     int selected = select(timeout);
537 
538                     nHandles += registerNew();
539 
540                     // get a chance to get out of the connector loop, if we
541                     // don't have any more handles
542                     if (nHandles == 0) {
543                         connectorRef.set(null);
544 
545                         if (connectQueue.isEmpty()) {
546                             assert (connectorRef.get() != this);
547                             break;
548                         }
549 
550                         if (!connectorRef.compareAndSet(null, this)) {
551                             assert (connectorRef.get() != this);
552                             break;
553                         }
554 
555                         assert (connectorRef.get() == this);
556                     }
557 
558                     if (selected > 0) {
559                         nHandles -= processConnections(selectedHandles());
560                     }
561 
562                     processTimedOutSessions(allHandles());
563 
564                     nHandles -= cancelKeys();
565                 } catch (ClosedSelectorException cse) {
566                     // If the selector has been closed, we can exit the loop
567                     ExceptionMonitor.getInstance().exceptionCaught(cse);
568                     break;
569                 } catch (Exception e) {
570                     ExceptionMonitor.getInstance().exceptionCaught(e);
571 
572                     try {
573                         Thread.sleep(1000);
574                     } catch (InterruptedException e1) {
575                         ExceptionMonitor.getInstance().exceptionCaught(e1);
576                     }
577                 }
578             }
579 
580             if (selectable && isDisposing()) {
581                 selectable = false;
582                 try {
583                     if (createdProcessor) {
584                         processor.dispose();
585                     }
586                 } finally {
587                     try {
588                         synchronized (disposalLock) {
589                             if (isDisposing()) {
590                                 destroy();
591                             }
592                         }
593                     } catch (Exception e) {
594                         ExceptionMonitor.getInstance().exceptionCaught(e);
595                     } finally {
596                         disposalFuture.setDone();
597                     }
598                 }
599             }
600         }
601     }
602 
603     public final class ConnectionRequest extends DefaultConnectFuture {
604         /** The handle associated with this connection request */
605         private final H handle;
606 
607         /** The time up to this connection request will be valid */
608         private final long deadline;
609 
610         /** The callback to call when the session is initialized */
611         private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;
612 
613         public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) {
614             this.handle = handle;
615             long timeout = getConnectTimeoutMillis();
616 
617             if (timeout <= 0L) {
618                 this.deadline = Long.MAX_VALUE;
619             } else {
620                 this.deadline = System.currentTimeMillis() + timeout;
621             }
622 
623             this.sessionInitializer = callback;
624         }
625 
626         public H getHandle() {
627             return handle;
628         }
629 
630         public long getDeadline() {
631             return deadline;
632         }
633 
634         public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {
635             return sessionInitializer;
636         }
637 
638         @Override
639         public boolean cancel() {
640             if (!isDone()) {
641                 boolean justCancelled = super.cancel();
642 
643                 // We haven't cancelled the request before, so add the future
644                 // in the cancel queue.
645                 if (justCancelled) {
646                     cancelQueue.add(this);
647                     startupWorker();
648                     wakeup();
649                 }
650             }
651 
652             return true;
653         }
654     }
655 }