View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.net.ConnectException;
23  import java.net.SocketAddress;
24  import java.nio.channels.ClosedSelectorException;
25  import java.util.Iterator;
26  import java.util.Queue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.Executor;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.atomic.AtomicReference;
31  
32  import org.apache.mina.core.RuntimeIoException;
33  import org.apache.mina.core.filterchain.IoFilter;
34  import org.apache.mina.core.future.ConnectFuture;
35  import org.apache.mina.core.future.DefaultConnectFuture;
36  import org.apache.mina.core.service.AbstractIoConnector;
37  import org.apache.mina.core.service.IoConnector;
38  import org.apache.mina.core.service.IoHandler;
39  import org.apache.mina.core.service.IoProcessor;
40  import org.apache.mina.core.service.SimpleIoProcessorPool;
41  import org.apache.mina.core.session.AbstractIoSession;
42  import org.apache.mina.core.session.IoSession;
43  import org.apache.mina.core.session.IoSessionConfig;
44  import org.apache.mina.core.session.IoSessionInitializer;
45  import org.apache.mina.transport.socket.nio.NioSocketConnector;
46  import org.apache.mina.util.ExceptionMonitor;
47  
48  /**
49   * A base class for implementing client transport using a polling strategy. The
50   * underlying sockets will be checked in an active loop and woke up when an
51   * socket needed to be processed. This class handle the logic behind binding,
52   * connecting and disposing the client sockets. A {@link Executor} will be used
53   * for running client connection, and an {@link AbstractPollingIoProcessor} will
54   * be used for processing connected client I/O operations like reading, writing
55   * and closing.
56   * 
57   * All the low level methods for binding, connecting, closing need to be
58   * provided by the subclassing implementation.
59   * 
60   * @see NioSocketConnector for a example of implementation
61   * 
62   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
63   */
64  public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H> extends AbstractIoConnector {
65  
66      private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
67  
68      private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
69  
70      private final IoProcessor<T> processor;
71  
72      private final boolean createdProcessor;
73  
74      private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
75  
76      private volatile boolean selectable;
77  
78      /** The connector thread */
79      private final AtomicReference<Connector> connectorRef = new AtomicReference<Connector>();
80  
81      /**
82       * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
83       * session configuration, a class of {@link IoProcessor} which will be instantiated in a
84       * {@link SimpleIoProcessorPool} for better scaling in multiprocessor systems. The default
85       * pool size will be used.
86       * 
87       * @see SimpleIoProcessorPool
88       * 
89       * @param sessionConfig
90       *            the default configuration for the managed {@link IoSession}
91       * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
92       *            type.
93       */
94      protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
95          this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
96      }
97  
98      /**
99       * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
100      * session configuration, a class of {@link IoProcessor} which will be instantiated in a
101      * {@link SimpleIoProcessorPool} for using multiple thread for better scaling in multiprocessor
102      * systems.
103      * 
104      * @see SimpleIoProcessorPool
105      * 
106      * @param sessionConfig
107      *            the default configuration for the managed {@link IoSession}
108      * @param processorClass a {@link Class} of {@link IoProcessor} for the associated {@link IoSession}
109      *            type.
110      * @param processorCount the amount of processor to instantiate for the pool
111      */
112     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass,
113             int processorCount) {
114         this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
115     }
116 
117     /**
118      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
119      * session configuration, a default {@link Executor} will be created using
120      * {@link Executors#newCachedThreadPool()}.
121      * 
122      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
123      * 
124      * @param sessionConfig
125      *            the default configuration for the managed {@link IoSession}
126      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 
127      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter} 
128      */
129     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
130         this(sessionConfig, null, processor, false);
131     }
132 
133     /**
134      * Constructor for {@link AbstractPollingIoConnector}. You need to provide a default
135      * session configuration and an {@link Executor} for handling I/O events. If
136      * null {@link Executor} is provided, a default one will be created using
137      * {@link Executors#newCachedThreadPool()}.
138      * 
139      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
140      * 
141      * @param sessionConfig
142      *            the default configuration for the managed {@link IoSession}
143      * @param executor
144      *            the {@link Executor} used for handling asynchronous execution of I/O
145      *            events. Can be <code>null</code>.
146      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 
147      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter} 
148      */
149     protected AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor) {
150         this(sessionConfig, executor, processor, false);
151     }
152 
153     /**
154      * Constructor for {@link AbstractPollingIoAcceptor}. You need to provide a default
155      * session configuration and an {@link Executor} for handling I/O events. If
156      * null {@link Executor} is provided, a default one will be created using
157      * {@link Executors#newCachedThreadPool()}.
158      * 
159      * {@see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)}
160      * 
161      * @param sessionConfig
162      *            the default configuration for the managed {@link IoSession}
163      * @param executor
164      *            the {@link Executor} used for handling asynchronous execution of I/O
165      *            events. Can be <code>null</code>.
166      * @param processor the {@link IoProcessor} for processing the {@link IoSession} of this transport, triggering 
167      *            events to the bound {@link IoHandler} and processing the chains of {@link IoFilter}
168      * @param createdProcessor tagging the processor as automatically created, so it will be automatically disposed 
169      */
170     private AbstractPollingIoConnector(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor,
171             boolean createdProcessor) {
172         super(sessionConfig, executor);
173 
174         if (processor == null) {
175             throw new IllegalArgumentException("processor");
176         }
177 
178         this.processor = processor;
179         this.createdProcessor = createdProcessor;
180 
181         try {
182             init();
183             selectable = true;
184         } catch (RuntimeException e) {
185             throw e;
186         } catch (Exception e) {
187             throw new RuntimeIoException("Failed to initialize.", e);
188         } finally {
189             if (!selectable) {
190                 try {
191                     destroy();
192                 } catch (Exception e) {
193                     ExceptionMonitor.getInstance().exceptionCaught(e);
194                 }
195             }
196         }
197     }
198 
199     /**
200      * Initialize the polling system, will be called at construction time.
201      * @throws Exception any exception thrown by the underlying system calls  
202      */
203     protected abstract void init() throws Exception;
204 
205     /**
206      * Destroy the polling system, will be called when this {@link IoConnector}
207      * implementation will be disposed.  
208      * @throws Exception any exception thrown by the underlying systems calls
209      */
210     protected abstract void destroy() throws Exception;
211 
212     /**
213      * Create a new client socket handle from a local {@link SocketAddress}
214      * @param localAddress the socket address for binding the new client socket 
215      * @return a new client socket handle 
216      * @throws Exception any exception thrown by the underlying systems calls
217      */
218     protected abstract H newHandle(SocketAddress localAddress) throws Exception;
219 
220     /**
221      * Connect a newly created client socket handle to a remote {@link SocketAddress}.
222      * This operation is non-blocking, so at end of the call the socket can be still in connection
223      * process.
224      * @param handle the client socket handle
225      * @param remoteAddress the remote address where to connect
226      * @return <tt>true</tt> if a connection was established, <tt>false</tt> if this client socket 
227      *         is in non-blocking mode and the connection operation is in progress
228      * @throws Exception
229      */
230     protected abstract boolean connect(H handle, SocketAddress remoteAddress) throws Exception;
231 
232     /**
233      * Finish the connection process of a client socket after it was marked as ready to process
234      * by the {@link #select(int)} call. The socket will be connected or reported as connection
235      * failed.
236      * @param handle the client socket handle to finsh to connect
237      * @return true if the socket is connected
238      * @throws Exception any exception thrown by the underlying systems calls
239      */
240     protected abstract boolean finishConnect(H handle) throws Exception;
241 
242     /**
243      * Create a new {@link IoSession} from a connected socket client handle.
244      * Will assign the created {@link IoSession} to the given {@link IoProcessor} for
245      * managing future I/O events.
246      * @param processor the processor in charge of this session
247      * @param handle the newly connected client socket handle
248      * @return a new {@link IoSession}
249      * @throws Exception any exception thrown by the underlying systems calls
250      */
251     protected abstract T newSession(IoProcessor<T> processor, H handle) throws Exception;
252 
253     /**
254      * Close a client socket.
255      * @param handle the client socket
256      * @throws Exception any exception thrown by the underlying systems calls
257      */
258     protected abstract void close(H handle) throws Exception;
259 
260     /**
261      * Interrupt the {@link #select()} method. Used when the poll set need to be modified.
262      */
263     protected abstract void wakeup();
264 
265     /**
266      * Check for connected sockets, interrupt when at least a connection is processed (connected or
267      * failed to connect). All the client socket descriptors processed need to be returned by 
268      * {@link #selectedHandles()}
269      * @return The number of socket having received some data
270      * @throws Exception any exception thrown by the underlying systems calls
271      */
272     protected abstract int select(int timeout) throws Exception;
273 
274     /**
275      * {@link Iterator} for the set of client sockets found connected or 
276      * failed to connect during the last {@link #select()} call.
277      * @return the list of client socket handles to process
278      */
279     protected abstract Iterator<H> selectedHandles();
280 
281     /**
282      * {@link Iterator} for all the client sockets polled for connection.
283      * @return the list of client sockets currently polled for connection
284      */
285     protected abstract Iterator<H> allHandles();
286 
287     /**
288      * Register a new client socket for connection, add it to connection polling
289      * @param handle client socket handle 
290      * @param request the associated {@link ConnectionRequest}
291      * @throws Exception any exception thrown by the underlying systems calls
292      */
293     protected abstract void register(H handle, ConnectionRequest request) throws Exception;
294 
295     /**
296      * get the {@link ConnectionRequest} for a given client socket handle
297      * @param handle the socket client handle 
298      * @return the connection request if the socket is connecting otherwise <code>null</code>
299      */
300     protected abstract ConnectionRequest getConnectionRequest(H handle);
301 
302     /**
303      * {@inheritDoc}
304      */
305     @Override
306     protected final void dispose0() throws Exception {
307         startupWorker();
308         wakeup();
309     }
310 
311     /**
312      * {@inheritDoc}
313      */
314     @Override
315     @SuppressWarnings("unchecked")
316     protected final ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress,
317             IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
318         H handle = null;
319         boolean success = false;
320         try {
321             handle = newHandle(localAddress);
322             if (connect(handle, remoteAddress)) {
323                 ConnectFuture future = new DefaultConnectFuture();
324                 T session = newSession(processor, handle);
325                 initSession(session, future, sessionInitializer);
326                 // Forward the remaining process to the IoProcessor.
327                 session.getProcessor().add(session);
328                 success = true;
329                 return future;
330             }
331 
332             success = true;
333         } catch (Exception e) {
334             return DefaultConnectFuture.newFailedFuture(e);
335         } finally {
336             if (!success && handle != null) {
337                 try {
338                     close(handle);
339                 } catch (Exception e) {
340                     ExceptionMonitor.getInstance().exceptionCaught(e);
341                 }
342             }
343         }
344 
345         ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
346         connectQueue.add(request);
347         startupWorker();
348         wakeup();
349 
350         return request;
351     }
352 
353     private void startupWorker() {
354         if (!selectable) {
355             connectQueue.clear();
356             cancelQueue.clear();
357         }
358 
359         Connector connector = connectorRef.get();
360 
361         if (connector == null) {
362             connector = new Connector();
363 
364             if (connectorRef.compareAndSet(null, connector)) {
365                 executeWorker(connector);
366             }
367         }
368     }
369 
370     private int registerNew() {
371         int nHandles = 0;
372         for (;;) {
373             ConnectionRequest req = connectQueue.poll();
374             if (req == null) {
375                 break;
376             }
377 
378             H handle = req.handle;
379             try {
380                 register(handle, req);
381                 nHandles++;
382             } catch (Exception e) {
383                 req.setException(e);
384                 try {
385                     close(handle);
386                 } catch (Exception e2) {
387                     ExceptionMonitor.getInstance().exceptionCaught(e2);
388                 }
389             }
390         }
391         return nHandles;
392     }
393 
394     private int cancelKeys() {
395         int nHandles = 0;
396 
397         for (;;) {
398             ConnectionRequest req = cancelQueue.poll();
399 
400             if (req == null) {
401                 break;
402             }
403 
404             H handle = req.handle;
405 
406             try {
407                 close(handle);
408             } catch (Exception e) {
409                 ExceptionMonitor.getInstance().exceptionCaught(e);
410             } finally {
411                 nHandles++;
412             }
413         }
414 
415         if (nHandles > 0) {
416             wakeup();
417         }
418 
419         return nHandles;
420     }
421 
422     /**
423      * Process the incoming connections, creating a new session for each
424      * valid connection. 
425      */
426     private int processConnections(Iterator<H> handlers) {
427         int nHandles = 0;
428 
429         // Loop on each connection request
430         while (handlers.hasNext()) {
431             H handle = handlers.next();
432             handlers.remove();
433 
434             ConnectionRequest connectionRequest = getConnectionRequest(handle);
435 
436             if (connectionRequest == null) {
437                 continue;
438             }
439 
440             boolean success = false;
441             try {
442                 if (finishConnect(handle)) {
443                     T session = newSession(processor, handle);
444                     initSession(session, connectionRequest, connectionRequest.getSessionInitializer());
445                     // Forward the remaining process to the IoProcessor.
446                     session.getProcessor().add(session);
447                     nHandles++;
448                 }
449                 success = true;
450             } catch (Throwable e) {
451                 connectionRequest.setException(e);
452             } finally {
453                 if (!success) {
454                     // The connection failed, we have to cancel it.
455                     cancelQueue.offer(connectionRequest);
456                 }
457             }
458         }
459         return nHandles;
460     }
461 
462     private void processTimedOutSessions(Iterator<H> handles) {
463         long currentTime = System.currentTimeMillis();
464 
465         while (handles.hasNext()) {
466             H handle = handles.next();
467             ConnectionRequest connectionRequest = getConnectionRequest(handle);
468 
469             if ((connectionRequest != null) && (currentTime >= connectionRequest.deadline)) {
470                 connectionRequest.setException(new ConnectException("Connection timed out."));
471                 cancelQueue.offer(connectionRequest);
472             }
473         }
474     }
475 
476     private class Connector implements Runnable {
477 
478         public void run() {
479             assert (connectorRef.get() == this);
480 
481             int nHandles = 0;
482 
483             while (selectable) {
484                 try {
485                     // the timeout for select shall be smaller of the connect
486                     // timeout or 1 second...
487                     int timeout = (int) Math.min(getConnectTimeoutMillis(), 1000L);
488                     int selected = select(timeout);
489 
490                     nHandles += registerNew();
491 
492                     // get a chance to get out of the connector loop, if we don't have any more handles
493                     if (nHandles == 0) {
494                         connectorRef.set(null);
495 
496                         if (connectQueue.isEmpty()) {
497                             assert (connectorRef.get() != this);
498                             break;
499                         }
500 
501                         if (!connectorRef.compareAndSet(null, this)) {
502                             assert (connectorRef.get() != this);
503                             break;
504                         }
505 
506                         assert (connectorRef.get() == this);
507                     }
508 
509                     if (selected > 0) {
510                         nHandles -= processConnections(selectedHandles());
511                     }
512 
513                     processTimedOutSessions(allHandles());
514 
515                     nHandles -= cancelKeys();
516                 } catch (ClosedSelectorException cse) {
517                     // If the selector has been closed, we can exit the loop
518                     break;
519                 } catch (Throwable e) {
520                     ExceptionMonitor.getInstance().exceptionCaught(e);
521 
522                     try {
523                         Thread.sleep(1000);
524                     } catch (InterruptedException e1) {
525                         ExceptionMonitor.getInstance().exceptionCaught(e1);
526                     }
527                 }
528             }
529 
530             if (selectable && isDisposing()) {
531                 selectable = false;
532                 try {
533                     if (createdProcessor) {
534                         processor.dispose();
535                     }
536                 } finally {
537                     try {
538                         synchronized (disposalLock) {
539                             if (isDisposing()) {
540                                 destroy();
541                             }
542                         }
543                     } catch (Exception e) {
544                         ExceptionMonitor.getInstance().exceptionCaught(e);
545                     } finally {
546                         disposalFuture.setDone();
547                     }
548                 }
549             }
550         }
551     }
552 
553     public final class ConnectionRequest extends DefaultConnectFuture {
554         private final H handle;
555 
556         private final long deadline;
557 
558         private final IoSessionInitializer<? extends ConnectFuture> sessionInitializer;
559 
560         public ConnectionRequest(H handle, IoSessionInitializer<? extends ConnectFuture> callback) {
561             this.handle = handle;
562             long timeout = getConnectTimeoutMillis();
563             if (timeout <= 0L) {
564                 this.deadline = Long.MAX_VALUE;
565             } else {
566                 this.deadline = System.currentTimeMillis() + timeout;
567             }
568             this.sessionInitializer = callback;
569         }
570 
571         public H getHandle() {
572             return handle;
573         }
574 
575         public long getDeadline() {
576             return deadline;
577         }
578 
579         public IoSessionInitializer<? extends ConnectFuture> getSessionInitializer() {
580             return sessionInitializer;
581         }
582 
583         @Override
584         public void cancel() {
585             if (!isDone()) {
586                 super.cancel();
587                 cancelQueue.add(this);
588                 startupWorker();
589                 wakeup();
590             }
591         }
592     }
593 }