View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.nio.channels.ByteChannel;
24  import java.nio.channels.DatagramChannel;
25  import java.nio.channels.SelectableChannel;
26  import java.nio.channels.SelectionKey;
27  import java.nio.channels.Selector;
28  import java.nio.channels.SocketChannel;
29  import java.util.Iterator;
30  import java.util.Set;
31  import java.util.concurrent.Executor;
32  
33  import org.apache.mina.core.RuntimeIoException;
34  import org.apache.mina.core.buffer.IoBuffer;
35  import org.apache.mina.core.file.FileRegion;
36  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
37  import org.apache.mina.core.session.SessionState;
38  
39  /**
40   * TODO Add documentation
41   *
42   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
43   */
44  public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
45      /** The selector associated with this processor */
46      private Selector selector;
47  
48      /**
49       *
50       * Creates a new instance of NioProcessor.
51       *
52       * @param executor
53       */
54      public NioProcessor(Executor executor) {
55          super(executor);
56  
57          try {
58              // Open a new selector
59              selector = Selector.open();
60          } catch (IOException e) {
61              throw new RuntimeIoException("Failed to open a selector.", e);
62          }
63      }
64  
65      @Override
66      protected void doDispose() throws Exception {
67          selector.close();
68      }
69  
70      @Override
71      protected int select(long timeout) throws Exception {
72          return selector.select(timeout);
73      }
74  
75      @Override
76      protected int select() throws Exception {
77          return selector.select();
78      }
79  
80      @Override
81      protected boolean isSelectorEmpty() {
82          return selector.keys().isEmpty();
83      }
84  
85      @Override
86      protected void wakeup() {
87          wakeupCalled.getAndSet(true);
88          selector.wakeup();
89      }
90  
91      @Override
92      protected Iterator<NioSession> allSessions() {
93          return new IoSessionIterator(selector.keys());
94      }
95  
96      @SuppressWarnings("synthetic-access")
97      @Override
98      protected Iterator<NioSession> selectedSessions() {
99          return new IoSessionIterator(selector.selectedKeys());
100     }
101 
102     @Override
103     protected void init(NioSession session) throws Exception {
104         SelectableChannel ch = (SelectableChannel) session.getChannel();
105         ch.configureBlocking(false);
106         session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
107     }
108 
109     @Override
110     protected void destroy(NioSession session) throws Exception {
111         ByteChannel ch = session.getChannel();
112         SelectionKey key = session.getSelectionKey();
113         if (key != null) {
114             key.cancel();
115         }
116         ch.close();
117     }
118 
119     /**
120      * In the case we are using the java select() method, this method is used to
121      * trash the buggy selector and create a new one, registering all the
122      * sockets on it.
123      */
124     @Override
125     protected void registerNewSelector() throws IOException {
126         synchronized (selector) {
127             Set<SelectionKey> keys = selector.keys();
128 
129             // Open a new selector
130             Selector newSelector = Selector.open();
131 
132             // Loop on all the registered keys, and register them on the new selector
133             for (SelectionKey key : keys) {
134                 SelectableChannel ch = key.channel();
135 
136                 // Don't forget to attache the session, and back !
137                 NioSession session = (NioSession) key.attachment();
138                 SelectionKey newKey = ch.register(newSelector, key.interestOps(), session);
139                 session.setSelectionKey(newKey);
140             }
141 
142             // Now we can close the old selector and switch it
143             selector.close();
144             selector = newSelector;
145         }
146     }
147 
148     /**
149      * {@inheritDoc}
150      */
151     @Override
152     protected boolean isBrokenConnection() throws IOException {
153         // A flag set to true if we find a broken session
154         boolean brokenSession = false;
155 
156         synchronized (selector) {
157             // Get the selector keys
158             Set<SelectionKey> keys = selector.keys();
159 
160             // Loop on all the keys to see if one of them
161             // has a closed channel
162             for (SelectionKey key : keys) {
163                 SelectableChannel channel = key.channel();
164 
165                 if ((((channel instanceof DatagramChannel) && !((DatagramChannel) channel).isConnected()))
166                         || ((channel instanceof SocketChannel) && !((SocketChannel) channel).isConnected())) {
167                     // The channel is not connected anymore. Cancel
168                     // the associated key then.
169                     key.cancel();
170 
171                     // Set the flag to true to avoid a selector switch
172                     brokenSession = true;
173                 }
174             }
175         }
176 
177         return brokenSession;
178     }
179 
180     /**
181      * {@inheritDoc}
182      */
183     @Override
184     protected SessionState getState(NioSession session) {
185         SelectionKey key = session.getSelectionKey();
186 
187         if (key == null) {
188             // The channel is not yet registred to a selector
189             return SessionState.OPENING;
190         }
191 
192         if (key.isValid()) {
193             // The session is opened
194             return SessionState.OPENED;
195         } else {
196             // The session still as to be closed
197             return SessionState.CLOSING;
198         }
199     }
200 
201     @Override
202     protected boolean isReadable(NioSession session) {
203         SelectionKey key = session.getSelectionKey();
204         return key.isValid() && key.isReadable();
205     }
206 
207     @Override
208     protected boolean isWritable(NioSession session) {
209         SelectionKey key = session.getSelectionKey();
210         return key.isValid() && key.isWritable();
211     }
212 
213     @Override
214     protected boolean isInterestedInRead(NioSession session) {
215         SelectionKey key = session.getSelectionKey();
216         return key.isValid() && ((key.interestOps() & SelectionKey.OP_READ) != 0);
217     }
218 
219     @Override
220     protected boolean isInterestedInWrite(NioSession session) {
221         SelectionKey key = session.getSelectionKey();
222         return key.isValid() && ((key.interestOps() & SelectionKey.OP_WRITE) != 0);
223     }
224 
225     /**
226      * {@inheritDoc}
227      */
228     @Override
229     protected void setInterestedInRead(NioSession session, boolean isInterested) throws Exception {
230         SelectionKey key = session.getSelectionKey();
231         int oldInterestOps = key.interestOps();
232         int newInterestOps = oldInterestOps;
233 
234         if (isInterested) {
235             newInterestOps |= SelectionKey.OP_READ;
236         } else {
237             newInterestOps &= ~SelectionKey.OP_READ;
238         }
239 
240         if (oldInterestOps != newInterestOps) {
241             key.interestOps(newInterestOps);
242         }
243     }
244 
245     /**
246      * {@inheritDoc}
247      */
248     @Override
249     protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
250         SelectionKey key = session.getSelectionKey();
251 
252         if (key == null) {
253             return;
254         }
255 
256         int newInterestOps = key.interestOps();
257 
258         if (isInterested) {
259             newInterestOps |= SelectionKey.OP_WRITE;
260             //newInterestOps &= ~SelectionKey.OP_READ;
261         } else {
262             newInterestOps &= ~SelectionKey.OP_WRITE;
263             //newInterestOps |= SelectionKey.OP_READ;
264         }
265 
266         key.interestOps(newInterestOps);
267     }
268 
269     @Override
270     protected int read(NioSession session, IoBuffer buf) throws Exception {
271         ByteChannel channel = session.getChannel();
272 
273         return channel.read(buf.buf());
274     }
275 
276     @Override
277     protected int write(NioSession session, IoBuffer buf, int length) throws Exception {
278         if (buf.remaining() <= length) {
279             return session.getChannel().write(buf.buf());
280         }
281 
282         int oldLimit = buf.limit();
283         buf.limit(buf.position() + length);
284         try {
285             return session.getChannel().write(buf.buf());
286         } finally {
287             buf.limit(oldLimit);
288         }
289     }
290 
291     @Override
292     protected int transferFile(NioSession session, FileRegion region, int length) throws Exception {
293         try {
294             return (int) region.getFileChannel().transferTo(region.getPosition(), length, session.getChannel());
295         } catch (IOException e) {
296             // Check to see if the IOException is being thrown due to
297             // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
298             String message = e.getMessage();
299             if ((message != null) && message.contains("temporarily unavailable")) {
300                 return 0;
301             }
302 
303             throw e;
304         }
305     }
306 
307     /**
308      * An encapsulating iterator around the {@link Selector#selectedKeys()} or
309      * the {@link Selector#keys()} iterator;
310      */
311     protected static class IoSessionIterator<NioSession> implements Iterator<NioSession> {
312         private final Iterator<SelectionKey> iterator;
313 
314         /**
315          * Create this iterator as a wrapper on top of the selectionKey Set.
316          *
317          * @param keys
318          *            The set of selected sessions
319          */
320         private IoSessionIterator(Set<SelectionKey> keys) {
321             iterator = keys.iterator();
322         }
323 
324         /**
325          * {@inheritDoc}
326          */
327         public boolean hasNext() {
328             return iterator.hasNext();
329         }
330 
331         /**
332          * {@inheritDoc}
333          */
334         public NioSession next() {
335             SelectionKey key = iterator.next();
336             NioSession nioSession = (NioSession) key.attachment();
337             return nioSession;
338         }
339 
340         /**
341          * {@inheritDoc}
342          */
343         public void remove() {
344             iterator.remove();
345         }
346     }
347 }