View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.nio.channels.ByteChannel;
24  import java.nio.channels.DatagramChannel;
25  import java.nio.channels.SelectableChannel;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.nio.channels.SocketChannel;
29  import java.nio.channels.spi.SelectorProvider;
30  import java.util.Iterator;
31  import java.util.Set;
32  import java.util.concurrent.Executor;
33  
34  import org.apache.mina.core.RuntimeIoException;
35  import org.apache.mina.core.buffer.IoBuffer;
36  import org.apache.mina.core.file.FileRegion;
37  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
38  import org.apache.mina.core.session.SessionState;
39  
40  /**
41   * TODO Add documentation
42   *
43   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
44   */
45  public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
46      /** The selector associated with this processor */
47      private Selector selector;
48  
49      private SelectorProvider selectorProvider = null;
50  
51      /**
52       *
53       * Creates a new instance of NioProcessor.
54       *
55       * @param executor The executor to use
56       */
57      public NioProcessor(Executor executor) {
58          super(executor);
59  
60          try {
61              // Open a new selector
62              selector = Selector.open();
63          } catch (IOException e) {
64              throw new RuntimeIoException("Failed to open a selector.", e);
65          }
66      }
67  
68      /**
69       *
70       * Creates a new instance of NioProcessor.
71       *
72       * @param executor The executor to use
73       * @param selectorProvider The Selector provider to use
74       */
75      public NioProcessor(Executor executor, SelectorProvider selectorProvider) {
76          super(executor);
77  
78          try {
79              // Open a new selector
80              if (selectorProvider == null) {
81                  selector = Selector.open();
82              } else {
83                  selector = selectorProvider.openSelector();
84              }
85  
86          } catch (IOException e) {
87              throw new RuntimeIoException("Failed to open a selector.", e);
88          }
89      }
90  
91      @Override
92      protected void doDispose() throws Exception {
93          selector.close();
94      }
95  
96      @Override
97      protected int select(long timeout) throws Exception {
98          return selector.select(timeout);
99      }
100 
101     @Override
102     protected int select() throws Exception {
103         return selector.select();
104     }
105 
106     @Override
107     protected boolean isSelectorEmpty() {
108         return selector.keys().isEmpty();
109     }
110 
111     @Override
112     protected void wakeup() {
113         wakeupCalled.getAndSet(true);
114         selector.wakeup();
115     }
116 
117     @Override
118     protected Iterator<NioSession> allSessions() {
119         return new IoSessionIterator(selector.keys());
120     }
121 
122     @SuppressWarnings("synthetic-access")
123     @Override
124     protected Iterator<NioSession> selectedSessions() {
125         return new IoSessionIterator(selector.selectedKeys());
126     }
127 
128     @Override
129     protected void init(NioSession session) throws Exception {
130         SelectableChannel ch = (SelectableChannel) session.getChannel();
131         ch.configureBlocking(false);
132         session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
133     }
134 
135     @Override
136     protected void destroy(NioSession session) throws Exception {
137         ByteChannel ch = session.getChannel();
138         
139         SelectionKey key = session.getSelectionKey();
140         
141         if (key != null) {
142             key.cancel();
143         }
144         
145         if ( ch.isOpen() ) {
146             ch.close();
147         }
148     }
149 
150     /**
151      * In the case we are using the java select() method, this method is used to
152      * trash the buggy selector and create a new one, registering all the
153      * sockets on it.
154      */
155     @Override
156     protected void registerNewSelector() throws IOException {
157         synchronized (selector) {
158             Set<SelectionKey> keys = selector.keys();
159 
160             // Open a new selector
161             Selector newSelector = null;
162 
163             if (selectorProvider == null) {
164                 newSelector = Selector.open();
165             } else {
166                 newSelector = selectorProvider.openSelector();
167             }
168 
169             // Loop on all the registered keys, and register them on the new selector
170             for (SelectionKey key : keys) {
171                 SelectableChannel ch = key.channel();
172 
173                 // Don't forget to attache the session, and back !
174                 NioSession session = (NioSession) key.attachment();
175                 SelectionKey newKey = ch.register(newSelector, key.interestOps(), session);
176                 session.setSelectionKey(newKey);
177             }
178 
179             // Now we can close the old selector and switch it
180             selector.close();
181             selector = newSelector;
182         }
183     }
184 
185     /**
186      * {@inheritDoc}
187      */
188     @Override
189     protected boolean isBrokenConnection() throws IOException {
190         // A flag set to true if we find a broken session
191         boolean brokenSession = false;
192 
193         synchronized (selector) {
194             // Get the selector keys
195             Set<SelectionKey> keys = selector.keys();
196 
197             // Loop on all the keys to see if one of them
198             // has a closed channel
199             for (SelectionKey key : keys) {
200                 SelectableChannel channel = key.channel();
201 
202                 if ((((channel instanceof DatagramChannel) && !((DatagramChannel) channel).isConnected()))
203                         || ((channel instanceof SocketChannel) && !((SocketChannel) channel).isConnected())) {
204                     // The channel is not connected anymore. Cancel
205                     // the associated key then.
206                     key.cancel();
207 
208                     // Set the flag to true to avoid a selector switch
209                     brokenSession = true;
210                 }
211             }
212         }
213 
214         return brokenSession;
215     }
216 
217     /**
218      * {@inheritDoc}
219      */
220     @Override
221     protected SessionState getState(NioSession session) {
222         SelectionKey key = session.getSelectionKey();
223 
224         if (key == null) {
225             // The channel is not yet registred to a selector
226             return SessionState.OPENING;
227         }
228 
229         if (key.isValid()) {
230             // The session is opened
231             return SessionState.OPENED;
232         } else {
233             // The session still as to be closed
234             return SessionState.CLOSING;
235         }
236     }
237 
238     @Override
239     protected boolean isReadable(NioSession session) {
240         SelectionKey key = session.getSelectionKey();
241 
242         return (key != null) && key.isValid() && key.isReadable();
243     }
244 
245     @Override
246     protected boolean isWritable(NioSession session) {
247         SelectionKey key = session.getSelectionKey();
248 
249         return (key != null) && key.isValid() && key.isWritable();
250     }
251 
252     @Override
253     protected boolean isInterestedInRead(NioSession session) {
254         SelectionKey key = session.getSelectionKey();
255 
256         return (key != null) && key.isValid() && ((key.interestOps() & SelectionKey.OP_READ) != 0);
257     }
258 
259     @Override
260     protected boolean isInterestedInWrite(NioSession session) {
261         SelectionKey key = session.getSelectionKey();
262 
263         return (key != null) && key.isValid() && ((key.interestOps() & SelectionKey.OP_WRITE) != 0);
264     }
265 
266     /**
267      * {@inheritDoc}
268      */
269     @Override
270     protected void setInterestedInRead(NioSession session, boolean isInterested) throws Exception {
271         SelectionKey key = session.getSelectionKey();
272 
273         if ((key == null) || !key.isValid()) {
274             return;
275         }
276 
277         int oldInterestOps = key.interestOps();
278         int newInterestOps = oldInterestOps;
279 
280         if (isInterested) {
281             newInterestOps |= SelectionKey.OP_READ;
282         } else {
283             newInterestOps &= ~SelectionKey.OP_READ;
284         }
285 
286         if (oldInterestOps != newInterestOps) {
287             key.interestOps(newInterestOps);
288         }
289     }
290 
291     /**
292      * {@inheritDoc}
293      */
294     @Override
295     protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
296         SelectionKey key = session.getSelectionKey();
297 
298         if ((key == null) || !key.isValid()) {
299             return;
300         }
301 
302         int newInterestOps = key.interestOps();
303 
304         if (isInterested) {
305             newInterestOps |= SelectionKey.OP_WRITE;
306         } else {
307             newInterestOps &= ~SelectionKey.OP_WRITE;
308         }
309 
310         key.interestOps(newInterestOps);
311     }
312 
313     @Override
314     protected int read(NioSession session, IoBuffer buf) throws Exception {
315         ByteChannel channel = session.getChannel();
316 
317         return channel.read(buf.buf());
318     }
319 
320     @Override
321     protected int write(NioSession session, IoBuffer buf, int length) throws IOException {
322         if (buf.remaining() <= length) {
323             return session.getChannel().write(buf.buf());
324         }
325 
326         int oldLimit = buf.limit();
327         buf.limit(buf.position() + length);
328         try {
329             return session.getChannel().write(buf.buf());
330         } finally {
331             buf.limit(oldLimit);
332         }
333     }
334 
335     @Override
336     protected int transferFile(NioSession session, FileRegion region, int length) throws Exception {
337         try {
338             return (int) region.getFileChannel().transferTo(region.getPosition(), length, session.getChannel());
339         } catch (IOException e) {
340             // Check to see if the IOException is being thrown due to
341             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
342             String message = e.getMessage();
343             if ((message != null) && message.contains("temporarily unavailable")) {
344                 return 0;
345             }
346 
347             throw e;
348         }
349     }
350 
351     /**
352      * An encapsulating iterator around the {@link Selector#selectedKeys()} or
353      * the {@link Selector#keys()} iterator;
354      */
355     protected static class IoSessionIterator<NioSession> implements Iterator<NioSession> {
356         private final Iterator<SelectionKey> iterator;
357 
358         /**
359          * Create this iterator as a wrapper on top of the selectionKey Set.
360          *
361          * @param keys
362          *            The set of selected sessions
363          */
364         private IoSessionIterator(Set<SelectionKey> keys) {
365             iterator = keys.iterator();
366         }
367 
368         /**
369          * {@inheritDoc}
370          */
371         public boolean hasNext() {
372             return iterator.hasNext();
373         }
374 
375         /**
376          * {@inheritDoc}
377          */
378         public NioSession next() {
379             SelectionKey key = iterator.next();
380             NioSession nioSession = (NioSession) key.attachment();
381             return nioSession;
382         }
383 
384         /**
385          * {@inheritDoc}
386          */
387         public void remove() {
388             iterator.remove();
389         }
390     }
391 }