View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.nio.channels.ByteChannel;
24  import java.nio.channels.DatagramChannel;
25  import java.nio.channels.SelectableChannel;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.nio.channels.SocketChannel;
29  import java.nio.channels.spi.SelectorProvider;
30  import java.util.Iterator;
31  import java.util.Set;
32  import java.util.concurrent.Executor;
33  
34  import org.apache.mina.core.RuntimeIoException;
35  import org.apache.mina.core.buffer.IoBuffer;
36  import org.apache.mina.core.file.FileRegion;
37  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
38  import org.apache.mina.core.session.SessionState;
39  
40  /**
41   * TODO Add documentation
42   *
43   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
44   */
45  public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
46      /** The selector associated with this processor */
47      private Selector selector;
48  
49      private SelectorProvider selectorProvider = null;
50  
51      /**
52       *
53       * Creates a new instance of NioProcessor.
54       *
55       * @param executor The executor to use
56       */
57      public NioProcessor(Executor executor) {
58          super(executor);
59  
60          try {
61              // Open a new selector
62              selector = Selector.open();
63          } catch (IOException e) {
64              throw new RuntimeIoException("Failed to open a selector.", e);
65          }
66      }
67  
68      /**
69       *
70       * Creates a new instance of NioProcessor.
71       *
72       * @param executor The executor to use
73       * @param selectorProvider The Selector provider to use
74       */
75      public NioProcessor(Executor executor, SelectorProvider selectorProvider) {
76          super(executor);
77  
78          try {
79              // Open a new selector
80              if (selectorProvider == null) {
81                  selector = Selector.open();
82              } else {
83                  selector = selectorProvider.openSelector();
84              }
85  
86          } catch (IOException e) {
87              throw new RuntimeIoException("Failed to open a selector.", e);
88          }
89      }
90  
91      @Override
92      protected void doDispose() throws Exception {
93          selector.close();
94      }
95  
96      @Override
97      protected int select(long timeout) throws Exception {
98          return selector.select(timeout);
99      }
100 
101     @Override
102     protected int select() throws Exception {
103         return selector.select();
104     }
105 
106     @Override
107     protected boolean isSelectorEmpty() {
108         return selector.keys().isEmpty();
109     }
110 
111     @Override
112     protected void wakeup() {
113         wakeupCalled.getAndSet(true);
114         selector.wakeup();
115     }
116 
117     @Override
118     protected Iterator<NioSession> allSessions() {
119         return new IoSessionIterator(selector.keys());
120     }
121 
122     @SuppressWarnings("synthetic-access")
123     @Override
124     protected Iterator<NioSession> selectedSessions() {
125         return new IoSessionIterator(selector.selectedKeys());
126     }
127 
128     @Override
129     protected void init(NioSession session) throws Exception {
130         SelectableChannel ch = (SelectableChannel) session.getChannel();
131         ch.configureBlocking(false);
132         session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
133     }
134 
135     @Override
136     protected void destroy(NioSession session) throws Exception {
137         ByteChannel ch = session.getChannel();
138         SelectionKey key = session.getSelectionKey();
139         if (key != null) {
140             key.cancel();
141         }
142         ch.close();
143     }
144 
145     /**
146      * In the case we are using the java select() method, this method is used to
147      * trash the buggy selector and create a new one, registering all the
148      * sockets on it.
149      */
150     @Override
151     protected void registerNewSelector() throws IOException {
152         synchronized (selector) {
153             Set<SelectionKey> keys = selector.keys();
154 
155             // Open a new selector
156             Selector newSelector = null;
157 
158             if (selectorProvider == null) {
159                 newSelector = Selector.open();
160             } else {
161                 newSelector = selectorProvider.openSelector();
162             }
163 
164             // Loop on all the registered keys, and register them on the new selector
165             for (SelectionKey key : keys) {
166                 SelectableChannel ch = key.channel();
167 
168                 // Don't forget to attache the session, and back !
169                 NioSession session = (NioSession) key.attachment();
170                 SelectionKey newKey = ch.register(newSelector, key.interestOps(), session);
171                 session.setSelectionKey(newKey);
172             }
173 
174             // Now we can close the old selector and switch it
175             selector.close();
176             selector = newSelector;
177         }
178     }
179 
180     /**
181      * {@inheritDoc}
182      */
183     @Override
184     protected boolean isBrokenConnection() throws IOException {
185         // A flag set to true if we find a broken session
186         boolean brokenSession = false;
187 
188         synchronized (selector) {
189             // Get the selector keys
190             Set<SelectionKey> keys = selector.keys();
191 
192             // Loop on all the keys to see if one of them
193             // has a closed channel
194             for (SelectionKey key : keys) {
195                 SelectableChannel channel = key.channel();
196 
197                 if ((((channel instanceof DatagramChannel) && !((DatagramChannel) channel).isConnected()))
198                         || ((channel instanceof SocketChannel) && !((SocketChannel) channel).isConnected())) {
199                     // The channel is not connected anymore. Cancel
200                     // the associated key then.
201                     key.cancel();
202 
203                     // Set the flag to true to avoid a selector switch
204                     brokenSession = true;
205                 }
206             }
207         }
208 
209         return brokenSession;
210     }
211 
212     /**
213      * {@inheritDoc}
214      */
215     @Override
216     protected SessionState getState(NioSession session) {
217         SelectionKey key = session.getSelectionKey();
218 
219         if (key == null) {
220             // The channel is not yet registred to a selector
221             return SessionState.OPENING;
222         }
223 
224         if (key.isValid()) {
225             // The session is opened
226             return SessionState.OPENED;
227         } else {
228             // The session still as to be closed
229             return SessionState.CLOSING;
230         }
231     }
232 
233     @Override
234     protected boolean isReadable(NioSession session) {
235         SelectionKey key = session.getSelectionKey();
236 
237         return (key != null) && key.isValid() && key.isReadable();
238     }
239 
240     @Override
241     protected boolean isWritable(NioSession session) {
242         SelectionKey key = session.getSelectionKey();
243 
244         return (key != null) && key.isValid() && key.isWritable();
245     }
246 
247     @Override
248     protected boolean isInterestedInRead(NioSession session) {
249         SelectionKey key = session.getSelectionKey();
250 
251         return (key != null) && key.isValid() && ((key.interestOps() & SelectionKey.OP_READ) != 0);
252     }
253 
254     @Override
255     protected boolean isInterestedInWrite(NioSession session) {
256         SelectionKey key = session.getSelectionKey();
257 
258         return (key != null) && key.isValid() && ((key.interestOps() & SelectionKey.OP_WRITE) != 0);
259     }
260 
261     /**
262      * {@inheritDoc}
263      */
264     @Override
265     protected void setInterestedInRead(NioSession session, boolean isInterested) throws Exception {
266         SelectionKey key = session.getSelectionKey();
267 
268         if ((key == null) || !key.isValid()) {
269             return;
270         }
271 
272         int oldInterestOps = key.interestOps();
273         int newInterestOps = oldInterestOps;
274 
275         if (isInterested) {
276             newInterestOps |= SelectionKey.OP_READ;
277         } else {
278             newInterestOps &= ~SelectionKey.OP_READ;
279         }
280 
281         if (oldInterestOps != newInterestOps) {
282             key.interestOps(newInterestOps);
283         }
284     }
285 
286     /**
287      * {@inheritDoc}
288      */
289     @Override
290     protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
291         SelectionKey key = session.getSelectionKey();
292 
293         if ((key == null) || !key.isValid()) {
294             return;
295         }
296 
297         int newInterestOps = key.interestOps();
298 
299         if (isInterested) {
300             newInterestOps |= SelectionKey.OP_WRITE;
301         } else {
302             newInterestOps &= ~SelectionKey.OP_WRITE;
303         }
304 
305         key.interestOps(newInterestOps);
306     }
307 
308     @Override
309     protected int read(NioSession session, IoBuffer buf) throws Exception {
310         ByteChannel channel = session.getChannel();
311 
312         return channel.read(buf.buf());
313     }
314 
315     @Override
316     protected int write(NioSession session, IoBuffer buf, int length) throws Exception {
317         if (buf.remaining() <= length) {
318             return session.getChannel().write(buf.buf());
319         }
320 
321         int oldLimit = buf.limit();
322         buf.limit(buf.position() + length);
323         try {
324             return session.getChannel().write(buf.buf());
325         } finally {
326             buf.limit(oldLimit);
327         }
328     }
329 
330     @Override
331     protected int transferFile(NioSession session, FileRegion region, int length) throws Exception {
332         try {
333             return (int) region.getFileChannel().transferTo(region.getPosition(), length, session.getChannel());
334         } catch (IOException e) {
335             // Check to see if the IOException is being thrown due to
336             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
337             String message = e.getMessage();
338             if ((message != null) && message.contains("temporarily unavailable")) {
339                 return 0;
340             }
341 
342             throw e;
343         }
344     }
345 
346     /**
347      * An encapsulating iterator around the {@link Selector#selectedKeys()} or
348      * the {@link Selector#keys()} iterator;
349      */
350     protected static class IoSessionIterator<NioSession> implements Iterator<NioSession> {
351         private final Iterator<SelectionKey> iterator;
352 
353         /**
354          * Create this iterator as a wrapper on top of the selectionKey Set.
355          *
356          * @param keys
357          *            The set of selected sessions
358          */
359         private IoSessionIterator(Set<SelectionKey> keys) {
360             iterator = keys.iterator();
361         }
362 
363         /**
364          * {@inheritDoc}
365          */
366         public boolean hasNext() {
367             return iterator.hasNext();
368         }
369 
370         /**
371          * {@inheritDoc}
372          */
373         public NioSession next() {
374             SelectionKey key = iterator.next();
375             NioSession nioSession = (NioSession) key.attachment();
376             return nioSession;
377         }
378 
379         /**
380          * {@inheritDoc}
381          */
382         public void remove() {
383             iterator.remove();
384         }
385     }
386 }