View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.nio.channels.ByteChannel;
24  import java.nio.channels.DatagramChannel;
25  import java.nio.channels.SelectableChannel;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.nio.channels.SocketChannel;
29  import java.nio.channels.spi.SelectorProvider;
30  import java.util.Iterator;
31  import java.util.Set;
32  import java.util.concurrent.Executor;
33  
34  import org.apache.mina.core.RuntimeIoException;
35  import org.apache.mina.core.buffer.IoBuffer;
36  import org.apache.mina.core.file.FileRegion;
37  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
38  import org.apache.mina.core.session.SessionState;
39  
40  /**
41   * TODO Add documentation
42   *
43   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
44   */
45  public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
46      /** The selector associated with this processor */
47      private Selector selector;
48  
49      private SelectorProvider selectorProvider = null;
50  
51      /**
52       *
53       * Creates a new instance of NioProcessor.
54       *
55       * @param executor
56       */
57      public NioProcessor(Executor executor) {
58          super(executor);
59  
60          try {
61              // Open a new selector
62              selector = Selector.open();
63          } catch (IOException e) {
64              throw new RuntimeIoException("Failed to open a selector.", e);
65          }
66      }
67  
68      /**
69       *
70       * Creates a new instance of NioProcessor.
71       *
72       * @param executor
73       */
74      public NioProcessor(Executor executor, SelectorProvider selectorProvider) {
75          super(executor);
76  
77          try {
78              // Open a new selector
79              if (selectorProvider == null) {
80                  selector = Selector.open();
81              } else {
82                  selector = selectorProvider.openSelector();
83              }
84  
85          } catch (IOException e) {
86              throw new RuntimeIoException("Failed to open a selector.", e);
87          }
88      }
89  
90      @Override
91      protected void doDispose() throws Exception {
92          selector.close();
93      }
94  
95      @Override
96      protected int select(long timeout) throws Exception {
97          return selector.select(timeout);
98      }
99  
100     @Override
101     protected int select() throws Exception {
102         return selector.select();
103     }
104 
105     @Override
106     protected boolean isSelectorEmpty() {
107         return selector.keys().isEmpty();
108     }
109 
110     @Override
111     protected void wakeup() {
112         wakeupCalled.getAndSet(true);
113         selector.wakeup();
114     }
115 
116     @Override
117     protected Iterator<NioSession> allSessions() {
118         return new IoSessionIterator(selector.keys());
119     }
120 
121     @SuppressWarnings("synthetic-access")
122     @Override
123     protected Iterator<NioSession> selectedSessions() {
124         return new IoSessionIterator(selector.selectedKeys());
125     }
126 
127     @Override
128     protected void init(NioSession session) throws Exception {
129         SelectableChannel ch = (SelectableChannel) session.getChannel();
130         ch.configureBlocking(false);
131         session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
132     }
133 
134     @Override
135     protected void destroy(NioSession session) throws Exception {
136         ByteChannel ch = session.getChannel();
137         SelectionKey key = session.getSelectionKey();
138         if (key != null) {
139             key.cancel();
140         }
141         ch.close();
142     }
143 
144     /**
145      * In the case we are using the java select() method, this method is used to
146      * trash the buggy selector and create a new one, registering all the
147      * sockets on it.
148      */
149     @Override
150     protected void registerNewSelector() throws IOException {
151         synchronized (selector) {
152             Set<SelectionKey> keys = selector.keys();
153 
154             // Open a new selector
155             Selector newSelector = null;
156 
157             if (selectorProvider == null) {
158                 newSelector = Selector.open();
159             } else {
160                 newSelector = selectorProvider.openSelector();
161             }
162 
163             // Loop on all the registered keys, and register them on the new selector
164             for (SelectionKey key : keys) {
165                 SelectableChannel ch = key.channel();
166 
167                 // Don't forget to attache the session, and back !
168                 NioSession session = (NioSession) key.attachment();
169                 SelectionKey newKey = ch.register(newSelector, key.interestOps(), session);
170                 session.setSelectionKey(newKey);
171             }
172 
173             // Now we can close the old selector and switch it
174             selector.close();
175             selector = newSelector;
176         }
177     }
178 
179     /**
180      * {@inheritDoc}
181      */
182     @Override
183     protected boolean isBrokenConnection() throws IOException {
184         // A flag set to true if we find a broken session
185         boolean brokenSession = false;
186 
187         synchronized (selector) {
188             // Get the selector keys
189             Set<SelectionKey> keys = selector.keys();
190 
191             // Loop on all the keys to see if one of them
192             // has a closed channel
193             for (SelectionKey key : keys) {
194                 SelectableChannel channel = key.channel();
195 
196                 if ((((channel instanceof DatagramChannel) && !((DatagramChannel) channel).isConnected()))
197                         || ((channel instanceof SocketChannel) && !((SocketChannel) channel).isConnected())) {
198                     // The channel is not connected anymore. Cancel
199                     // the associated key then.
200                     key.cancel();
201 
202                     // Set the flag to true to avoid a selector switch
203                     brokenSession = true;
204                 }
205             }
206         }
207 
208         return brokenSession;
209     }
210 
211     /**
212      * {@inheritDoc}
213      */
214     @Override
215     protected SessionState getState(NioSession session) {
216         SelectionKey key = session.getSelectionKey();
217 
218         if (key == null) {
219             // The channel is not yet registred to a selector
220             return SessionState.OPENING;
221         }
222 
223         if (key.isValid()) {
224             // The session is opened
225             return SessionState.OPENED;
226         } else {
227             // The session still as to be closed
228             return SessionState.CLOSING;
229         }
230     }
231 
232     @Override
233     protected boolean isReadable(NioSession session) {
234         SelectionKey key = session.getSelectionKey();
235 
236         return (key != null) && key.isValid() && key.isReadable();
237     }
238 
239     @Override
240     protected boolean isWritable(NioSession session) {
241         SelectionKey key = session.getSelectionKey();
242 
243         return (key != null) && key.isValid() && key.isWritable();
244     }
245 
246     @Override
247     protected boolean isInterestedInRead(NioSession session) {
248         SelectionKey key = session.getSelectionKey();
249 
250         return (key != null) && key.isValid() && ((key.interestOps() & SelectionKey.OP_READ) != 0);
251     }
252 
253     @Override
254     protected boolean isInterestedInWrite(NioSession session) {
255         SelectionKey key = session.getSelectionKey();
256 
257         return (key != null) && key.isValid() && ((key.interestOps() & SelectionKey.OP_WRITE) != 0);
258     }
259 
260     /**
261      * {@inheritDoc}
262      */
263     @Override
264     protected void setInterestedInRead(NioSession session, boolean isInterested) throws Exception {
265         SelectionKey key = session.getSelectionKey();
266 
267         if ((key == null) || !key.isValid()) {
268             return;
269         }
270 
271         int oldInterestOps = key.interestOps();
272         int newInterestOps = oldInterestOps;
273 
274         if (isInterested) {
275             newInterestOps |= SelectionKey.OP_READ;
276         } else {
277             newInterestOps &= ~SelectionKey.OP_READ;
278         }
279 
280         if (oldInterestOps != newInterestOps) {
281             key.interestOps(newInterestOps);
282         }
283     }
284 
285     /**
286      * {@inheritDoc}
287      */
288     @Override
289     protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
290         SelectionKey key = session.getSelectionKey();
291 
292         if ((key == null) || !key.isValid()) {
293             return;
294         }
295 
296         int newInterestOps = key.interestOps();
297 
298         if (isInterested) {
299             newInterestOps |= SelectionKey.OP_WRITE;
300         } else {
301             newInterestOps &= ~SelectionKey.OP_WRITE;
302         }
303 
304         key.interestOps(newInterestOps);
305     }
306 
307     @Override
308     protected int read(NioSession session, IoBuffer buf) throws Exception {
309         ByteChannel channel = session.getChannel();
310 
311         return channel.read(buf.buf());
312     }
313 
314     @Override
315     protected int write(NioSession session, IoBuffer buf, int length) throws Exception {
316         if (buf.remaining() <= length) {
317             return session.getChannel().write(buf.buf());
318         }
319 
320         int oldLimit = buf.limit();
321         buf.limit(buf.position() + length);
322         try {
323             return session.getChannel().write(buf.buf());
324         } finally {
325             buf.limit(oldLimit);
326         }
327     }
328 
329     @Override
330     protected int transferFile(NioSession session, FileRegion region, int length) throws Exception {
331         try {
332             return (int) region.getFileChannel().transferTo(region.getPosition(), length, session.getChannel());
333         } catch (IOException e) {
334             // Check to see if the IOException is being thrown due to
335             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
336             String message = e.getMessage();
337             if ((message != null) && message.contains("temporarily unavailable")) {
338                 return 0;
339             }
340 
341             throw e;
342         }
343     }
344 
345     /**
346      * An encapsulating iterator around the {@link Selector#selectedKeys()} or
347      * the {@link Selector#keys()} iterator;
348      */
349     protected static class IoSessionIterator<NioSession> implements Iterator<NioSession> {
350         private final Iterator<SelectionKey> iterator;
351 
352         /**
353          * Create this iterator as a wrapper on top of the selectionKey Set.
354          *
355          * @param keys
356          *            The set of selected sessions
357          */
358         private IoSessionIterator(Set<SelectionKey> keys) {
359             iterator = keys.iterator();
360         }
361 
362         /**
363          * {@inheritDoc}
364          */
365         public boolean hasNext() {
366             return iterator.hasNext();
367         }
368 
369         /**
370          * {@inheritDoc}
371          */
372         public NioSession next() {
373             SelectionKey key = iterator.next();
374             NioSession nioSession = (NioSession) key.attachment();
375             return nioSession;
376         }
377 
378         /**
379          * {@inheritDoc}
380          */
381         public void remove() {
382             iterator.remove();
383         }
384     }
385 }