View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.ConnectException;
23  import java.net.SocketAddress;
24  import java.nio.channels.ClosedSelectorException;
25  import java.util.Iterator;
26  import java.util.Queue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.atomic.AtomicReference;
31  
32  import org.apache.mina.core.RuntimeIoException;
33  import org.apache.mina.core.filterchain.IoFilter;
34  import org.apache.mina.core.future.ConnectFuture;
35  import org.apache.mina.core.future.DefaultConnectFuture;
36  import org.apache.mina.core.service.AbstractIoConnector;
37  import org.apache.mina.core.service.AbstractIoService;
38  import org.apache.mina.core.service.IoConnector;
39  import org.apache.mina.core.service.IoHandler;
40  import org.apache.mina.core.service.IoProcessor;
41  import org.apache.mina.core.service.SimpleIoProcessorPool;
42  import org.apache.mina.core.session.AbstractIoSession;
43  import org.apache.mina.core.session.IoSession;
44  import org.apache.mina.core.session.IoSessionConfig;
45  import org.apache.mina.core.session.IoSessionInitializer;
46  import org.apache.mina.transport.socket.nio.NioSocketConnector;
47  import org.apache.mina.util.ExceptionMonitor;
48  
49  /**
50   * A base class for implementing client transport using a polling strategy. The
51   * underlying sockets will be checked in an active loop and woke up when an
52   * socket needed to be processed. This class handle the logic behind binding,
53   * connecting and disposing the client sockets. A {@link Executor} will be used
54   * for running client connection, and an {@link AbstractPollingIoProcessor} will
55   * be used for processing connected client I/O operations like reading, writing
56   * and closing.
57   * 
58   * All the low level methods for binding, connecting, closing need to be
59   * provided by the subclassing implementation.
60   * 
61   * @see NioSocketConnector for a example of implementation
62   * 
63   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
64   */
65  public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H> extends AbstractIoConnector {
66  
67      private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
68  
69      private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
70  
71      private final IoProcessor<T> processor;
72  
73      private final boolean createdProcessor;
74  
75      private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
76  
77      private volatile boolean selectable;
78  
79      /** The connector thread */
80      private final AtomicReference<Connector> connectorRef = new AtomicReference<Connector>();
81  
82      /**
83       * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
84       * session configuration, a class of {@link IoProcessor} which will be instantiated in a
85       * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default
86       * pool size will be used.
87       * 
88       * @see SimpleIoProcessorPool
89       * 
90       * @param sessionConfig
91       *            the default configuration for the managed {@link IoSession}
92       * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
93       *            type.
94       */
95      protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
96          this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
97      }
98  
99      /**
100      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
101      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
102      * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
103      * systems.
104      * 
105      * @see SimpleIoProcessorPool
106      * 
107      * @param sessionConfig
108      *            the default configuration for the managed {@link IoSession}
109      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
110      *            type.
111      * @param processorCount the amount of processor to instantiate for the pool
112      */
113     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass,
114             int processorCount) {
115         this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
116     }
117 
118     /**
119      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
120      * default session configuration, a default {@link Executor} will be created
121      * using {@link Executors#newCachedThreadPool()}.
122      * 
123      * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
124      * 
125      * @param sessionConfig
126      *            the default configuration for the managed {@link IoSession}
127      * @param processor
128      *            the {@link IoProcessor} for processing the {@link IoSession}
129      *            of this transport, triggering events to the bound
130      *            {@link IoHandler} and processing the chains of
131      *            {@link IoFilter}
132      */
133     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
134         this(sessionConfig, null, processor, false);
135     }
136 
137     /**
138      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a
139      * default session configuration and an {@link Executor} for handling I/O
140      * events. If null {@link Executor} is provided, a default one will be
141      * created using {@link Executors#newCachedThreadPool()}.
142      * 
143      * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
144      * 
145      * @param sessionConfig
146      *            the default configuration for the managed {@link IoSession}
147      * @param executor
148      *            the {@link Executor} used for handling asynchronous execution
149      *            of I/O events. Can be <code>null</code>.
150      * @param processor
151      *            the {@link IoProcessor} for processing the {@link IoSession}
152      *            of this transport, triggering events to the bound
153      *            {@link IoHandler} and processing the chains of
154      *            {@link IoFilter}
155      */
156     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor) {
157         this(sessionConfig, executor, processor, false);
158     }
159 
160     /**
161      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a
162      * default session configuration and an {@link Executor} for handling I/O
163      * events. If null {@link Executor} is provided, a default one will be
164      * created using {@link Executors#newCachedThreadPool()}.
165      * 
166      * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
167      * 
168      * @param sessionConfig
169      *            the default configuration for the managed {@link IoSession}
170      * @param executor
171      *            the {@link Executor} used for handling asynchronous execution
172      *            of I/O events. Can be <code>null</code>.
173      * @param processor
174      *            the {@link IoProcessor} for processing the {@link IoSession}
175      *            of this transport, triggering events to the bound
176      *            {@link IoHandler} and processing the chains of
177      *            {@link IoFilter}
178      * @param createdProcessor
179      *            tagging the processor as automatically created, so it will be
180      *            automatically disposed
181      */
182     private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor,
183             boolean createdProcessor) {
184         super(sessionConfig, executor);
185 
186         if (processor == null) {
187             throw new IllegalArgumentException("processor");
188         }
189 
190         this.processor = processor;
191         this.createdProcessor = createdProcessor;
192 
193         try {
194             init();
195             selectable = true;
196         } catch (RuntimeException e) {
197             throw e;
198         } catch (Exception e) {
199             throw new RuntimeIoException("Failed to initialize.", e);
200         } finally {
201             if (!selectable) {
202                 try {
203                     destroy();
204                 } catch (Exception e) {
205                     ExceptionMonitor.getInstance().exceptionCaught(e);
206                 }
207             }
208         }
209     }
210 
211     /**
212      * Initialize the polling system, will be called at construction time.
213      * @throws Exception any exception thrown by the underlying system calls
214      */
215     protected abstract void init() throws Exception;
216 
217     /**
218      * Destroy the polling system, will be called when this {@link IoConnector}
219      * implementation will be disposed.
220      * @throws Exception any exception thrown by the underlying systems calls
221      */
222     protected abstract void destroy() throws Exception;
223 
224     /**
225      * Create a new client socket handle from a local {@link SocketAddress}
226      * @param localAddress the socket address for binding the new client socket
227      * @return a new client socket handle
228      * @throws Exception any exception thrown by the underlying systems calls
229      */
230     protected abstract H newHandle(SocketAddress localAddress) throws Exception;
231 
232     /**
233      * Connect a newly created client socket handle to a remote {@link SocketAddress}.
234      * This operation is non-blocking, so at end of the call the socket can be still in connection
235      * process.
236      * @param handle the client socket handle
237      * @param remoteAddress the remote address where to connect
238      * @return <tt>true</tt> if a connection was established, <tt>false</tt> if this client socket
239      *         is in non-blocking mode and the connection operation is in progress
240      * @throws Exception
241      */
242     protected abstract boolean connect(H handle, SocketAddress remoteAddress) throws Exception;
243 
244     /**
245      * Finish the connection process of a client socket after it was marked as
246      * ready to process by the {@link #select(int)} call. The socket will be
247      * connected or reported as connection failed.
248      * 
249      * @param handle
250      *            the client socket handle to finish to connect
251      * @return true if the socket is connected
252      * @throws Exception
253      *             any exception thrown by the underlying systems calls
254      */
255     protected abstract boolean finishConnect(H handle) throws Exception;
256 
257     /**
258      * Create a new {@link IoSession} from a connected socket client handle.
259      * Will assign the created {@link IoSession} to the given {@link IoProcessor} for
260      * managing future I/O events.
261      * @param processor the processor in charge of this session
262      * @param handle the newly connected client socket handle
263      * @return a new {@link IoSession}
264      * @throws Exception any exception thrown by the underlying systems calls
265      */
266     protected abstract T newSession(IoProcessor<T> processor, H handle) throws Exception;
267 
268     /**
269      * Close a client socket.
270      * @param handle the client socket
271      * @throws Exception any exception thrown by the underlying systems calls
272      */
273     protected abstract void close(H handle) throws Exception;
274 
275     /**
276      * Interrupt the {@link #select(int)} method. Used when the poll set need to
277      * be modified.
278      */
279     protected abstract void wakeup();
280 
281     /**
282      * Check for connected sockets, interrupt when at least a connection is processed (connected or
283      * failed to connect). All the client socket descriptors processed need to be returned by
284      * {@link #selectedHandles()}
285      * @return The number of socket having received some data
286      * @throws Exception any exception thrown by the underlying systems calls
287      */
288     protected abstract int select(int timeout) throws Exception;
289 
290     /**
291      * {@link Iterator} for the set of client sockets found connected or failed
292      * to connect during the last {@link #select(int)} call.
293      * 
294      * @return the list of client socket handles to process
295      */
296     protected abstract Iterator<H> selectedHandles();
297 
298     /**
299      * {@link Iterator} for all the client sockets polled for connection.
300      * @return the list of client sockets currently polled for connection
301      */
302     protected abstract Iterator<H> allHandles();
303 
304     /**
305      * Register a new client socket for connection, add it to connection polling
306      * @param handle client socket handle
307      * @param request the associated {@link ConnectionRequest}
308      * @throws Exception any exception thrown by the underlying systems calls
309      */
310     protected abstract void register(H handle, ConnectionRequest request) throws Exception;
311 
312     /**
313      * get the {@link ConnectionRequest} for a given client socket handle
314      * @param handle the socket client handle
315      * @return the connection request if the socket is connecting otherwise <code>null</code>
316      */
317     protected abstract ConnectionRequest getConnectionRequest(H handle);
318 
319     /**
320      * {@inheritDoc}
321      */
322     @Override
323     protected final void dispose0() throws Exception {
324         startupWorker();
325         wakeup();
326     }
327 
328     /**
329      * {@inheritDoc}
330      */
331     @Override
332     @SuppressWarnings("unchecked")
333     protected final ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress,
334             IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
335         H handle = null;
336         boolean success = false;
337         try {
338             handle = newHandle(localAddress);
339             if (connect(handle, remoteAddress)) {
340                 ConnectFuture future = new DefaultConnectFuture();
341                 T session = newSession(processor, handle);
342                 initSession(session, future, sessionInitializer);
343                 // Forward the remaining process to the IoProcessor.
344                 session.getProcessor().add(session);
345                 success = true;
346                 return future;
347             }
348 
349             success = true;
350         } catch (Exception e) {
351             return DefaultConnectFuture.newFailedFuture(e);
352         } finally {
353             if (!success && handle != null) {
354                 try {
355                     close(handle);
356                 } catch (Exception e) {
357                     ExceptionMonitor.getInstance().exceptionCaught(e);
358                 }
359             }
360         }
361 
362         ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
363         connectQueue.add(request);
364         startupWorker();
365         wakeup();
366 
367         return request;
368     }
369 
370     private void startupWorker() {
371         if (!selectable) {
372             connectQueue.clear();
373             cancelQueue.clear();
374         }
375 
376         Connector connector = connectorRef.get();
377 
378         if (connector == null) {
379             connector = new Connector();
380 
381             if (connectorRef.compareAndSet(null, connector)) {
382                 executeWorker(connector);
383             }
384         }
385     }
386 
387     private int registerNew() {
388         int nHandles = 0;
389         for (;;) {
390             ConnectionRequest req = connectQueue.poll();
391             if (req == null) {
392                 break;
393             }
394 
395             H handle = req.handle;
396             try {
397                 register(handle, req);
398                 nHandles++;
399             } catch (Exception e) {
400                 req.setException(e);
401                 try {
402                     close(handle);
403                 } catch (Exception e2) {
404                     ExceptionMonitor.getInstance().exceptionCaught(e2);
405                 }
406             }
407         }
408         return nHandles;
409     }
410 
411     private int cancelKeys() {
412         int nHandles = 0;
413 
414         for (;;) {
415             ConnectionRequest req = cancelQueue.poll();
416 
417             if (req == null) {
418                 break;
419             }
420 
421             H handle = req.handle;
422 
423             try {
424                 close(handle);
425             } catch (Exception e) {
426                 ExceptionMonitor.getInstance().exceptionCaught(e);
427             } finally {
428                 nHandles++;
429             }
430         }
431 
432         if (nHandles > 0) {
433             wakeup();
434         }
435 
436         return nHandles;
437     }
438 
439     /**
440      * Process the incoming connections, creating a new session for each
441      * valid connection.
442      */
443     private int processConnections(Iterator<H> handlers) {
444         int nHandles = 0;
445 
446         // Loop on each connection request
447         while (handlers.hasNext()) {
448             H handle = handlers.next();
449             handlers.remove();
450 
451             ConnectionRequest connectionRequest = getConnectionRequest(handle);
452 
453             if (connectionRequest == null) {
454                 continue;
455             }
456 
457             boolean success = false;
458             try {
459                 if (finishConnect(handle)) {
460                     T session = newSession(processor, handle);
461                     initSession(session, connectionRequest, connectionRequest.getSessionInitializer());
462                     // Forward the remaining process to the IoProcessor.
463                     session.getProcessor().add(session);
464                     nHandles++;
465                 }
466                 success = true;
467             } catch (Exception e) {
468                 connectionRequest.setException(e);
469             } finally {
470                 if (!success) {
471                     // The connection failed, we have to cancel it.
472                     cancelQueue.offer(connectionRequest);
473                 }
474             }
475         }
476         return nHandles;
477     }
478 
479     private void processTimedOutSessions(Iterator<H> handles) {
480         long currentTime = System.currentTimeMillis();
481 
482         while (handles.hasNext()) {
483             H handle = handles.next();
484             ConnectionRequest connectionRequest = getConnectionRequest(handle);
485 
486             if ((connectionRequest != null) && (currentTime >= connectionRequest.deadline)) {
487                 connectionRequest.setException(new ConnectException("Connection timed out."));
488                 cancelQueue.offer(connectionRequest);
489             }
490         }
491     }
492 
493     private class Connector implements Runnable {
494 
495         public void run() {
496             assert (connectorRef.get() == this);
497 
498             int nHandles = 0;
499 
500             while (selectable) {
501                 try {
502                     // the timeout for select shall be smaller of the connect
503                     // timeout or 1 second...
504                     int timeout = (int) Math.min(getConnectTimeoutMillis(), 1000L);
505                     int selected = select(timeout);
506 
507                     nHandles += registerNew();
508 
509                     // get a chance to get out of the connector loop, if we don't have any more handles
510                     if (nHandles == 0) {
511                         connectorRef.set(null);
512 
513                         if (connectQueue.isEmpty()) {
514                             assert (connectorRef.get() != this);
515                             break;
516                         }
517 
518                         if (!connectorRef.compareAndSet(null, this)) {
519                             assert (connectorRef.get() != this);
520                             break;
521                         }
522 
523                         assert (connectorRef.get() == this);
524                     }
525 
526                     if (selected > 0) {
527                         nHandles -= processConnections(selectedHandles());
528                     }
529 
530                     processTimedOutSessions(allHandles());
531 
532                     nHandles -= cancelKeys();
533                 } catch (ClosedSelectorException cse) {
534                     // If the selector has been closed, we can exit the loop
535                     ExceptionMonitor.getInstance().exceptionCaught(cse);
536                     break;
537                 } catch (Exception e) {
538                     ExceptionMonitor.getInstance().exceptionCaught(e);
539 
540                     try {
541                         Thread.sleep(1000);
542                     } catch (InterruptedException e1) {
543                         ExceptionMonitor.getInstance().exceptionCaught(e1);
544                     }
545                 }
546             }
547 
548             if (selectable && isDisposing()) {
549                 selectable = false;
550                 try {
551                     if (createdProcessor) {
552                         processor.dispose();
553                     }
554                 } finally {
555                     try {
556                         synchronized (disposalLock) {
557                             if (isDisposing()) {
558                                 destroy();
559                             }
560                         }
561                     } catch (Exception e) {
562                         ExceptionMonitor.getInstance().exceptionCaught(e);
563                     } finally {
564                         disposalFuture.setDone();
565                     }
566                 }
567             }
568         }
569     }
570 
571     public final class ConnectionRequest extends DefaultConnectFuture {
572         private final H handle;
573 
574         private final long deadline;
575 
576         private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;
577 
578         public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) {
579             this.handle = handle;
580             long timeout = getConnectTimeoutMillis();
581             if (timeout <= 0L) {
582                 this.deadline = Long.MAX_VALUE;
583             } else {
584                 this.deadline = System.currentTimeMillis() + timeout;
585             }
586             this.sessionInitializer = callback;
587         }
588 
589         public H getHandle() {
590             return handle;
591         }
592 
593         public long getDeadline() {
594             return deadline;
595         }
596 
597         public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {
598             return sessionInitializer;
599         }
600 
601         @Override
602         public void cancel() {
603             if (!isDone()) {
604                 super.cancel();
605                 cancelQueue.add(this);
606                 startupWorker();
607                 wakeup();
608             }
609         }
610     }
611 }