001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.transport.socket.nio;
021
022import java.io.IOException;
023import java.nio.channels.ByteChannel;
024import java.nio.channels.DatagramChannel;
025import java.nio.channels.SelectableChannel;
026import java.nio.channels.SelectionKey;
027import java.nio.channels.Selector;
028import java.nio.channels.SocketChannel;
029import java.nio.channels.spi.SelectorProvider;
030import java.util.Iterator;
031import java.util.Set;
032import java.util.concurrent.Executor;
033
034import org.apache.mina.core.RuntimeIoException;
035import org.apache.mina.core.buffer.IoBuffer;
036import org.apache.mina.core.file.FileRegion;
037import org.apache.mina.core.polling.AbstractPollingIoProcessor;
038import org.apache.mina.core.session.SessionState;
039
040/**
041 * TODO Add documentation
042 *
043 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
044 */
045public final class NioProcessor extends AbstractPollingIoProcessor<NioSession> {
046    /** The selector associated with this processor */
047    private Selector selector;
048
049    private SelectorProvider selectorProvider = null;
050
051    /**
052     *
053     * Creates a new instance of NioProcessor.
054     *
055     * @param executor The executor to use
056     */
057    public NioProcessor(Executor executor) {
058        super(executor);
059
060        try {
061            // Open a new selector
062            selector = Selector.open();
063        } catch (IOException e) {
064            throw new RuntimeIoException("Failed to open a selector.", e);
065        }
066    }
067
068    /**
069     *
070     * Creates a new instance of NioProcessor.
071     *
072     * @param executor The executor to use
073     * @param selectorProvider The Selector provider to use
074     */
075    public NioProcessor(Executor executor, SelectorProvider selectorProvider) {
076        super(executor);
077
078        try {
079            // Open a new selector
080            if (selectorProvider == null) {
081                selector = Selector.open();
082            } else {
083                selector = selectorProvider.openSelector();
084            }
085
086        } catch (IOException e) {
087            throw new RuntimeIoException("Failed to open a selector.", e);
088        }
089    }
090
091    @Override
092    protected void doDispose() throws Exception {
093        selector.close();
094    }
095
096    @Override
097    protected int select(long timeout) throws Exception {
098        return selector.select(timeout);
099    }
100
101    @Override
102    protected int select() throws Exception {
103        return selector.select();
104    }
105
106    @Override
107    protected boolean isSelectorEmpty() {
108        return selector.keys().isEmpty();
109    }
110
111    @Override
112    protected void wakeup() {
113        wakeupCalled.getAndSet(true);
114        selector.wakeup();
115    }
116
117    @Override
118    protected Iterator<NioSession> allSessions() {
119        return new IoSessionIterator(selector.keys());
120    }
121
122    @SuppressWarnings("synthetic-access")
123    @Override
124    protected Iterator<NioSession> selectedSessions() {
125        return new IoSessionIterator(selector.selectedKeys());
126    }
127
128    @Override
129    protected void init(NioSession session) throws Exception {
130        SelectableChannel ch = (SelectableChannel) session.getChannel();
131        ch.configureBlocking(false);
132        session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
133    }
134
135    @Override
136    protected void destroy(NioSession session) throws Exception {
137        ByteChannel ch = session.getChannel();
138        
139        SelectionKey key = session.getSelectionKey();
140        
141        if (key != null) {
142            key.cancel();
143        }
144        
145        if ( ch.isOpen() ) {
146            ch.close();
147        }
148    }
149
150    /**
151     * In the case we are using the java select() method, this method is used to
152     * trash the buggy selector and create a new one, registering all the
153     * sockets on it.
154     */
155    @Override
156    protected void registerNewSelector() throws IOException {
157        synchronized (selector) {
158            Set<SelectionKey> keys = selector.keys();
159
160            // Open a new selector
161            Selector newSelector = null;
162
163            if (selectorProvider == null) {
164                newSelector = Selector.open();
165            } else {
166                newSelector = selectorProvider.openSelector();
167            }
168
169            // Loop on all the registered keys, and register them on the new selector
170            for (SelectionKey key : keys) {
171                SelectableChannel ch = key.channel();
172
173                // Don't forget to attache the session, and back !
174                NioSession session = (NioSession) key.attachment();
175                SelectionKey newKey = ch.register(newSelector, key.interestOps(), session);
176                session.setSelectionKey(newKey);
177            }
178
179            // Now we can close the old selector and switch it
180            selector.close();
181            selector = newSelector;
182        }
183    }
184
185    /**
186     * {@inheritDoc}
187     */
188    @Override
189    protected boolean isBrokenConnection() throws IOException {
190        // A flag set to true if we find a broken session
191        boolean brokenSession = false;
192
193        synchronized (selector) {
194            // Get the selector keys
195            Set<SelectionKey> keys = selector.keys();
196
197            // Loop on all the keys to see if one of them
198            // has a closed channel
199            for (SelectionKey key : keys) {
200                SelectableChannel channel = key.channel();
201
202                if ((((channel instanceof DatagramChannel) && !((DatagramChannel) channel).isConnected()))
203                        || ((channel instanceof SocketChannel) && !((SocketChannel) channel).isConnected())) {
204                    // The channel is not connected anymore. Cancel
205                    // the associated key then.
206                    key.cancel();
207
208                    // Set the flag to true to avoid a selector switch
209                    brokenSession = true;
210                }
211            }
212        }
213
214        return brokenSession;
215    }
216
217    /**
218     * {@inheritDoc}
219     */
220    @Override
221    protected SessionState getState(NioSession session) {
222        SelectionKey key = session.getSelectionKey();
223
224        if (key == null) {
225            // The channel is not yet registred to a selector
226            return SessionState.OPENING;
227        }
228
229        if (key.isValid()) {
230            // The session is opened
231            return SessionState.OPENED;
232        } else {
233            // The session still as to be closed
234            return SessionState.CLOSING;
235        }
236    }
237
238    @Override
239    protected boolean isReadable(NioSession session) {
240        SelectionKey key = session.getSelectionKey();
241
242        return (key != null) && key.isValid() && key.isReadable();
243    }
244
245    @Override
246    protected boolean isWritable(NioSession session) {
247        SelectionKey key = session.getSelectionKey();
248
249        return (key != null) && key.isValid() && key.isWritable();
250    }
251
252    @Override
253    protected boolean isInterestedInRead(NioSession session) {
254        SelectionKey key = session.getSelectionKey();
255
256        return (key != null) && key.isValid() && ((key.interestOps() & SelectionKey.OP_READ) != 0);
257    }
258
259    @Override
260    protected boolean isInterestedInWrite(NioSession session) {
261        SelectionKey key = session.getSelectionKey();
262
263        return (key != null) && key.isValid() && ((key.interestOps() & SelectionKey.OP_WRITE) != 0);
264    }
265
266    /**
267     * {@inheritDoc}
268     */
269    @Override
270    protected void setInterestedInRead(NioSession session, boolean isInterested) throws Exception {
271        SelectionKey key = session.getSelectionKey();
272
273        if ((key == null) || !key.isValid()) {
274            return;
275        }
276
277        int oldInterestOps = key.interestOps();
278        int newInterestOps = oldInterestOps;
279
280        if (isInterested) {
281            newInterestOps |= SelectionKey.OP_READ;
282        } else {
283            newInterestOps &= ~SelectionKey.OP_READ;
284        }
285
286        if (oldInterestOps != newInterestOps) {
287            key.interestOps(newInterestOps);
288        }
289    }
290
291    /**
292     * {@inheritDoc}
293     */
294    @Override
295    protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
296        SelectionKey key = session.getSelectionKey();
297
298        if ((key == null) || !key.isValid()) {
299            return;
300        }
301
302        int newInterestOps = key.interestOps();
303
304        if (isInterested) {
305            newInterestOps |= SelectionKey.OP_WRITE;
306        } else {
307            newInterestOps &= ~SelectionKey.OP_WRITE;
308        }
309
310        key.interestOps(newInterestOps);
311    }
312
313    @Override
314    protected int read(NioSession session, IoBuffer buf) throws Exception {
315        ByteChannel channel = session.getChannel();
316
317        return channel.read(buf.buf());
318    }
319
320    @Override
321    protected int write(NioSession session, IoBuffer buf, int length) throws IOException {
322        if (buf.remaining() <= length) {
323            return session.getChannel().write(buf.buf());
324        }
325
326        int oldLimit = buf.limit();
327        buf.limit(buf.position() + length);
328        try {
329            return session.getChannel().write(buf.buf());
330        } finally {
331            buf.limit(oldLimit);
332        }
333    }
334
335    @Override
336    protected int transferFile(NioSession session, FileRegion region, int length) throws Exception {
337        try {
338            return (int) region.getFileChannel().transferTo(region.getPosition(), length, session.getChannel());
339        } catch (IOException e) {
340            // Check to see if the IOException is being thrown due to
341            // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
342            String message = e.getMessage();
343            if ((message != null) && message.contains("temporarily unavailable")) {
344                return 0;
345            }
346
347            throw e;
348        }
349    }
350
351    /**
352     * An encapsulating iterator around the {@link Selector#selectedKeys()} or
353     * the {@link Selector#keys()} iterator;
354     */
355    protected static class IoSessionIterator<NioSession> implements Iterator<NioSession> {
356        private final Iterator<SelectionKey> iterator;
357
358        /**
359         * Create this iterator as a wrapper on top of the selectionKey Set.
360         *
361         * @param keys
362         *            The set of selected sessions
363         */
364        private IoSessionIterator(Set<SelectionKey> keys) {
365            iterator = keys.iterator();
366        }
367
368        /**
369         * {@inheritDoc}
370         */
371        public boolean hasNext() {
372            return iterator.hasNext();
373        }
374
375        /**
376         * {@inheritDoc}
377         */
378        public NioSession next() {
379            SelectionKey key = iterator.next();
380            NioSession nioSession = (NioSession) key.attachment();
381            return nioSession;
382        }
383
384        /**
385         * {@inheritDoc}
386         */
387        public void remove() {
388            iterator.remove();
389        }
390    }
391}