001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.core.polling;
021
022import java.io.IOException;
023import java.net.PortUnreachableException;
024import java.nio.channels.ClosedSelectorException;
025import java.util.ArrayList;
026import java.util.Iterator;
027import java.util.List;
028import java.util.Queue;
029import java.util.concurrent.ConcurrentHashMap;
030import java.util.concurrent.ConcurrentLinkedQueue;
031import java.util.concurrent.Executor;
032import java.util.concurrent.atomic.AtomicBoolean;
033import java.util.concurrent.atomic.AtomicInteger;
034import java.util.concurrent.atomic.AtomicReference;
035
036import org.apache.mina.core.buffer.IoBuffer;
037import org.apache.mina.core.file.FileRegion;
038import org.apache.mina.core.filterchain.IoFilterChain;
039import org.apache.mina.core.filterchain.IoFilterChainBuilder;
040import org.apache.mina.core.future.DefaultIoFuture;
041import org.apache.mina.core.service.AbstractIoService;
042import org.apache.mina.core.service.IoProcessor;
043import org.apache.mina.core.service.IoServiceListenerSupport;
044import org.apache.mina.core.session.AbstractIoSession;
045import org.apache.mina.core.session.IoSession;
046import org.apache.mina.core.session.IoSessionConfig;
047import org.apache.mina.core.session.SessionState;
048import org.apache.mina.core.write.WriteRequest;
049import org.apache.mina.core.write.WriteRequestQueue;
050import org.apache.mina.core.write.WriteToClosedSessionException;
051import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
052import org.apache.mina.util.ExceptionMonitor;
053import org.apache.mina.util.NamePreservingRunnable;
054import org.slf4j.Logger;
055import org.slf4j.LoggerFactory;
056
057/**
058 * An abstract implementation of {@link IoProcessor} which helps transport
059 * developers to write an {@link IoProcessor} easily. This class is in charge of
060 * active polling a set of {@link IoSession} and trigger events when some I/O
061 * operation is possible.
062 * 
063 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
064 * 
065 * @param <S>
066 *            the type of the {@link IoSession} this processor can handle
067 */
068public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
069    /** A logger for this class */
070    private static final Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
071
072    /**
073     * A timeout used for the select, as we need to get out to deal with idle
074     * sessions
075     */
076    private static final long SELECT_TIMEOUT = 1000L;
077
078    /** A map containing the last Thread ID for each class */
079    private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<>();
080
081    /** This IoProcessor instance name */
082    private final String threadName;
083
084    /** The executor to use when we need to start the inner Processor */
085    private final Executor executor;
086
087    /** A Session queue containing the newly created sessions */
088    private final Queue<S> newSessions = new ConcurrentLinkedQueue<>();
089
090    /** A queue used to store the sessions to be removed */
091    private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>();
092
093    /** A queue used to store the sessions to be flushed */
094    private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<>();
095
096    /**
097     * A queue used to store the sessions which have a trafficControl to be
098     * updated
099     */
100    private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<>();
101
102    /** The processor thread : it handles the incoming messages */
103    private final AtomicReference<Processor> processorRef = new AtomicReference<>();
104
105    private long lastIdleCheckTime;
106
107    private final Object disposalLock = new Object();
108
109    private volatile boolean disposing;
110
111    private volatile boolean disposed;
112
113    private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
114
115    protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
116
117    /**
118     * Create an {@link AbstractPollingIoProcessor} with the given
119     * {@link Executor} for handling I/Os events.
120     * 
121     * @param executor
122     *            the {@link Executor} for handling I/O events
123     */
124    protected AbstractPollingIoProcessor(Executor executor) {
125        if (executor == null) {
126            throw new IllegalArgumentException("executor");
127        }
128
129        this.threadName = nextThreadName();
130        this.executor = executor;
131    }
132
133    /**
134     * Compute the thread ID for this class instance. As we may have different
135     * classes, we store the last ID number into a Map associating the class
136     * name to the last assigned ID.
137     * 
138     * @return a name for the current thread, based on the class name and an
139     *         incremental value, starting at 1.
140     */
141    private String nextThreadName() {
142        Class<?> cls = getClass();
143        int newThreadId;
144
145        AtomicInteger threadId = threadIds.putIfAbsent(cls, new AtomicInteger(1));
146
147        if (threadId == null) {
148            newThreadId = 1;
149        } else {
150            // Just increment the last ID, and get it.
151            newThreadId = threadId.incrementAndGet();
152        }
153
154        // Now we can compute the name for this thread
155        return cls.getSimpleName() + '-' + newThreadId;
156    }
157
158    /**
159     * {@inheritDoc}
160     */
161    @Override
162    public final boolean isDisposing() {
163        return disposing;
164    }
165
166    /**
167     * {@inheritDoc}
168     */
169    @Override
170    public final boolean isDisposed() {
171        return disposed;
172    }
173
174    /**
175     * {@inheritDoc}
176     */
177    @Override
178    public final void dispose() {
179        if (disposed || disposing) {
180            return;
181        }
182
183        synchronized (disposalLock) {
184            disposing = true;
185            startupProcessor();
186        }
187
188        disposalFuture.awaitUninterruptibly();
189        disposed = true;
190    }
191
192    /**
193     * Dispose the resources used by this {@link IoProcessor} for polling the
194     * client connections. The implementing class doDispose method will be
195     * called.
196     * 
197     * @throws Exception
198     *             if some low level IO error occurs
199     */
200    protected abstract void doDispose() throws Exception;
201
202    /**
203     * poll those sessions for the given timeout
204     * 
205     * @param timeout
206     *            milliseconds before the call timeout if no event appear
207     * @return The number of session ready for read or for write
208     * @throws Exception
209     *             if some low level IO error occurs
210     */
211    protected abstract int select(long timeout) throws Exception;
212
213    /**
214     * poll those sessions forever
215     * 
216     * @return The number of session ready for read or for write
217     * @throws Exception
218     *             if some low level IO error occurs
219     */
220    protected abstract int select() throws Exception;
221
222    /**
223     * Say if the list of {@link IoSession} polled by this {@link IoProcessor}
224     * is empty
225     * 
226     * @return <tt>true</tt> if at least a session is managed by this {@link IoProcessor}
227     */
228    protected abstract boolean isSelectorEmpty();
229
230    /**
231     * Interrupt the {@link #select(long)} call.
232     */
233    protected abstract void wakeup();
234
235    /**
236     * Get an {@link Iterator} for the list of {@link IoSession} polled by this
237     * {@link IoProcessor}
238     * 
239     * @return {@link Iterator} of {@link IoSession}
240     */
241    protected abstract Iterator<S> allSessions();
242
243    /**
244     * Get an {@link Iterator} for the list of {@link IoSession} found selected
245     * by the last call of {@link #select(long)}
246     * 
247     * @return {@link Iterator} of {@link IoSession} read for I/Os operation
248     */
249    protected abstract Iterator<S> selectedSessions();
250
251    /**
252     * Get the state of a session (One of OPENING, OPEN, CLOSING)
253     * 
254     * @param session the {@link IoSession} to inspect
255     * @return the state of the session
256     */
257    protected abstract SessionState getState(S session);
258    
259    
260    /**
261     * Tells if the session ready for writing
262     * 
263     * @param session the queried session
264     * @return <tt>true</tt> is ready, <tt>false</tt> if not ready
265     */
266    protected abstract boolean isWritable(S session);
267
268    /**
269     * Tells if the session ready for reading
270     * 
271     * @param session the queried session
272     * @return <tt>true</tt> is ready, <tt>false</tt> if not ready
273     */
274    protected abstract boolean isReadable(S session);
275
276    /**
277     * Set the session to be informed when a write event should be processed
278     * 
279     * @param session the session for which we want to be interested in write events
280     * @param isInterested <tt>true</tt> for registering, <tt>false</tt> for removing
281     * @throws Exception If there was a problem while registering the session 
282     */
283    protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;
284
285    /**
286     * Set the session to be informed when a read event should be processed
287     * 
288     * @param session the session for which we want to be interested in read events
289     * @param isInterested <tt>true</tt> for registering, <tt>false</tt> for removing
290     * @throws Exception If there was a problem while registering the session 
291     */
292    protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;
293
294    /**
295     * Tells if this session is registered for reading
296     * 
297     * @param session the queried session
298     * @return <tt>true</tt> is registered for reading
299     */
300    protected abstract boolean isInterestedInRead(S session);
301
302    /**
303     * Tells if this session is registered for writing
304     * 
305     * @param session the queried session
306     * @return <tt>true</tt> is registered for writing
307     */
308    protected abstract boolean isInterestedInWrite(S session);
309
310    /**
311     * Initialize the polling of a session. Add it to the polling process.
312     * 
313     * @param session the {@link IoSession} to add to the polling
314     * @throws Exception any exception thrown by the underlying system calls
315     */
316    protected abstract void init(S session) throws Exception;
317
318    /**
319     * Destroy the underlying client socket handle
320     * 
321     * @param session the {@link IoSession}
322     * @throws Exception any exception thrown by the underlying system calls
323     */
324    protected abstract void destroy(S session) throws Exception;
325
326    /**
327     * Reads a sequence of bytes from a {@link IoSession} into the given
328     * {@link IoBuffer}. Is called when the session was found ready for reading.
329     * 
330     * @param session the session to read
331     * @param buf the buffer to fill
332     * @return the number of bytes read
333     * @throws Exception any exception thrown by the underlying system calls
334     */
335    protected abstract int read(S session, IoBuffer buf) throws Exception;
336
337    /**
338     * Write a sequence of bytes to a {@link IoSession}, means to be called when
339     * a session was found ready for writing.
340     * 
341     * @param session the session to write
342     * @param buf the buffer to write
343     * @param length the number of bytes to write can be superior to the number of
344     *            bytes remaining in the buffer
345     * @return the number of byte written
346     * @throws IOException any exception thrown by the underlying system calls
347     */
348    protected abstract int write(S session, IoBuffer buf, int length) throws IOException;
349
350    /**
351     * Write a part of a file to a {@link IoSession}, if the underlying API
352     * isn't supporting system calls like sendfile(), you can throw a
353     * {@link UnsupportedOperationException} so the file will be send using
354     * usual {@link #write(AbstractIoSession, IoBuffer, int)} call.
355     * 
356     * @param session the session to write
357     * @param region the file region to write
358     * @param length the length of the portion to send
359     * @return the number of written bytes
360     * @throws Exception any exception thrown by the underlying system calls
361     */
362    protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;
363
364    /**
365     * {@inheritDoc}
366     */
367    @Override
368    public final void add(S session) {
369        if (disposed || disposing) {
370            throw new IllegalStateException("Already disposed.");
371        }
372
373        // Adds the session to the newSession queue and starts the worker
374        newSessions.add(session);
375        startupProcessor();
376    }
377
378    /**
379     * {@inheritDoc}
380     */
381    @Override
382    public final void remove(S session) {
383        scheduleRemove(session);
384        startupProcessor();
385    }
386
387    private void scheduleRemove(S session) {
388        if (!removingSessions.contains(session)) {
389            removingSessions.add(session);
390        }
391    }
392
393    /**
394     * {@inheritDoc}
395     */
396    @Override
397    public void write(S session, WriteRequest writeRequest) {
398        WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
399
400        writeRequestQueue.offer(session, writeRequest);
401
402        if (!session.isWriteSuspended()) {
403            this.flush(session);
404        }
405    }
406
407    /**
408     * {@inheritDoc}
409     */
410    @Override
411    public final void flush(S session) {
412        // add the session to the queue if it's not already
413        // in the queue, then wake up the select()
414        if (session.setScheduledForFlush(true)) {
415            flushingSessions.add(session);
416            wakeup();
417        }
418    }
419
420    private void scheduleFlush(S session) {
421        // add the session to the queue if it's not already
422        // in the queue
423        if (session.setScheduledForFlush(true)) {
424            flushingSessions.add(session);
425        }
426    }
427
428    /**
429     * Updates the traffic mask for a given session
430     * 
431     * @param session the session to update
432     */
433    public final void updateTrafficMask(S session) {
434        trafficControllingSessions.add(session);
435        wakeup();
436    }
437
438    /**
439     * Starts the inner Processor, asking the executor to pick a thread in its
440     * pool. The Runnable will be renamed
441     */
442    private void startupProcessor() {
443        Processor processor = processorRef.get();
444
445        if (processor == null) {
446            processor = new Processor();
447
448            if (processorRef.compareAndSet(null, processor)) {
449                executor.execute(new NamePreservingRunnable(processor, threadName));
450            }
451        }
452
453        // Just stop the select() and start it again, so that the processor
454        // can be activated immediately.
455        wakeup();
456    }
457
458    /**
459     * In the case we are using the java select() method, this method is used to
460     * trash the buggy selector and create a new one, registring all the sockets
461     * on it.
462     * 
463     * @throws IOException If we got an exception
464     */
465    protected abstract void registerNewSelector() throws IOException;
466
467    /**
468     * Check that the select() has not exited immediately just because of a
469     * broken connection. In this case, this is a standard case, and we just
470     * have to loop.
471     * 
472     * @return <tt>true</tt> if a connection has been brutally closed.
473     * @throws IOException If we got an exception
474     */
475    protected abstract boolean isBrokenConnection() throws IOException;
476
477    /**
478     * Loops over the new sessions blocking queue and returns the number of
479     * sessions which are effectively created
480     * 
481     * @return The number of new sessions
482     */
483    private int handleNewSessions() {
484        int addedSessions = 0;
485
486        for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
487            if (addNow(session)) {
488                // A new session has been created
489                addedSessions++;
490            }
491        }
492
493        return addedSessions;
494    }
495
496    /**
497     * Process a new session : - initialize it - create its chain - fire the
498     * CREATED listeners if any
499     * 
500     * @param session The session to create
501     * @return <tt>true</tt> if the session has been registered
502     */
503    private boolean addNow(S session) {
504        boolean registered = false;
505
506        try {
507            init(session);
508            registered = true;
509
510            // Build the filter chain of this session.
511            IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
512            chainBuilder.buildFilterChain(session.getFilterChain());
513
514            // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
515            // in AbstractIoFilterChain.fireSessionOpened().
516            // Propagate the SESSION_CREATED event up to the chain
517            IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
518            listeners.fireSessionCreated(session);
519        } catch (Exception e) {
520            ExceptionMonitor.getInstance().exceptionCaught(e);
521
522            try {
523                destroy(session);
524            } catch (Exception e1) {
525                ExceptionMonitor.getInstance().exceptionCaught(e1);
526            } finally {
527                registered = false;
528            }
529        }
530
531        return registered;
532    }
533
534    private int removeSessions() {
535        int removedSessions = 0;
536
537        for (S session = removingSessions.poll(); session != null;session = removingSessions.poll()) {
538            SessionState state = getState(session);
539
540            // Now deal with the removal accordingly to the session's state
541            switch (state) {
542                case OPENED:
543                    // Try to remove this session
544                    if (removeNow(session)) {
545                        removedSessions++;
546                    }
547                    
548                    break;
549    
550                case CLOSING:
551                    // Skip if channel is already closed
552                    // In any case, remove the session from the queue
553                    removedSessions++;
554                    break;
555    
556                case OPENING:
557                    // Remove session from the newSessions queue and
558                    // remove it
559                    newSessions.remove(session);
560    
561                    if (removeNow(session)) {
562                        removedSessions++;
563                    }
564    
565                    break;
566    
567                default:
568                    throw new IllegalStateException(String.valueOf(state));
569            }
570        }
571
572        return removedSessions;
573    }
574
575    private boolean removeNow(S session) {
576        clearWriteRequestQueue(session);
577
578        try {
579            destroy(session);
580            return true;
581        } catch (Exception e) {
582            IoFilterChain filterChain = session.getFilterChain();
583            filterChain.fireExceptionCaught(e);
584        } finally {
585            try {
586                clearWriteRequestQueue(session);
587                ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
588            } catch (Exception e) {
589                // The session was either destroyed or not at this point.
590                // We do not want any exception thrown from this "cleanup" code to change
591                // the return value by bubbling up.
592                IoFilterChain filterChain = session.getFilterChain();
593                filterChain.fireExceptionCaught(e);
594            }
595        }
596        
597        return false;
598    }
599
600    private void clearWriteRequestQueue(S session) {
601        WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
602        WriteRequest req;
603
604        List<WriteRequest> failedRequests = new ArrayList<>();
605
606        if ((req = writeRequestQueue.poll(session)) != null) {
607            Object message = req.getMessage();
608
609            if (message instanceof IoBuffer) {
610                IoBuffer buf = (IoBuffer) message;
611
612                // The first unwritten empty buffer must be
613                // forwarded to the filter chain.
614                if (buf.hasRemaining()) {
615                    buf.reset();
616                    failedRequests.add(req);
617                } else {
618                    IoFilterChain filterChain = session.getFilterChain();
619                    filterChain.fireMessageSent(req);
620                }
621            } else {
622                failedRequests.add(req);
623            }
624
625            // Discard others.
626            while ((req = writeRequestQueue.poll(session)) != null) {
627                failedRequests.add(req);
628            }
629        }
630
631        // Create an exception and notify.
632        if (!failedRequests.isEmpty()) {
633            WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
634
635            for (WriteRequest r : failedRequests) {
636                session.decreaseScheduledBytesAndMessages(r);
637                r.getFuture().setException(cause);
638            }
639
640            IoFilterChain filterChain = session.getFilterChain();
641            filterChain.fireExceptionCaught(cause);
642        }
643    }
644
645    private void process() throws Exception {
646        for (Iterator<S> i = selectedSessions(); i.hasNext();) {
647            S session = i.next();
648            process(session);
649            i.remove();
650        }
651    }
652
653    /**
654     * Deal with session ready for the read or write operations, or both.
655     */
656    private void process(S session) {
657        // Process Reads
658        if (isReadable(session) && !session.isReadSuspended()) {
659            read(session);
660        }
661
662        // Process writes
663        if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) {
664            // add the session to the queue, if it's not already there
665            flushingSessions.add(session);
666        }
667    }
668
669    private void read(S session) {
670        IoSessionConfig config = session.getConfig();
671        int bufferSize = config.getReadBufferSize();
672        IoBuffer buf = IoBuffer.allocate(bufferSize);
673
674        final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
675
676        try {
677            int readBytes = 0;
678            int ret;
679
680            try {
681                if (hasFragmentation) {
682
683                    while ((ret = read(session, buf)) > 0) {
684                        readBytes += ret;
685
686                        if (!buf.hasRemaining()) {
687                            break;
688                        }
689                    }
690                } else {
691                    ret = read(session, buf);
692
693                    if (ret > 0) {
694                        readBytes = ret;
695                    }
696                }
697            } finally {
698                buf.flip();
699            }
700
701            if (readBytes > 0) {
702                IoFilterChain filterChain = session.getFilterChain();
703                filterChain.fireMessageReceived(buf);
704                buf = null;
705
706                if (hasFragmentation) {
707                    if (readBytes << 1 < config.getReadBufferSize()) {
708                        session.decreaseReadBufferSize();
709                    } else if (readBytes == config.getReadBufferSize()) {
710                        session.increaseReadBufferSize();
711                    }
712                }
713            }
714
715            if (ret < 0) {
716                IoFilterChain filterChain = session.getFilterChain();
717                filterChain.fireInputClosed();
718            }
719        } catch (Exception e) {
720            if (e instanceof IOException) {
721                if (!(e instanceof PortUnreachableException)
722                        || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
723                        || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
724                    scheduleRemove(session);
725                }
726            }
727
728            IoFilterChain filterChain = session.getFilterChain();
729            filterChain.fireExceptionCaught(e);
730        }
731    }
732
733    private void notifyIdleSessions(long currentTime) throws Exception {
734        // process idle sessions
735        if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
736            lastIdleCheckTime = currentTime;
737            AbstractIoSession.notifyIdleness(allSessions(), currentTime);
738        }
739    }
740
741    /**
742     * Write all the pending messages
743     */
744    private void flush(long currentTime) {
745        if (flushingSessions.isEmpty()) {
746            return;
747        }
748
749        do {
750            S session = flushingSessions.poll(); // the same one with
751                                                 // firstSession
752
753            if (session == null) {
754                // Just in case ... It should not happen.
755                break;
756            }
757
758            // Reset the Schedule for flush flag for this session,
759            // as we are flushing it now
760            session.unscheduledForFlush();
761
762            SessionState state = getState(session);
763
764            switch (state) {
765                case OPENED:
766                    try {
767                        boolean flushedAll = flushNow(session, currentTime);
768    
769                        if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
770                                && !session.isScheduledForFlush()) {
771                            scheduleFlush(session);
772                        }
773                    } catch (Exception e) {
774                        scheduleRemove(session);
775                        session.closeNow();
776                        IoFilterChain filterChain = session.getFilterChain();
777                        filterChain.fireExceptionCaught(e);
778                    }
779    
780                    break;
781    
782                case CLOSING:
783                    // Skip if the channel is already closed.
784                    break;
785    
786                case OPENING:
787                    // Retry later if session is not yet fully initialized.
788                    // (In case that Session.write() is called before addSession()
789                    // is processed)
790                    scheduleFlush(session);
791                    return;
792    
793                default:
794                    throw new IllegalStateException(String.valueOf(state));
795            }
796
797        } while (!flushingSessions.isEmpty());
798    }
799
800    private boolean flushNow(S session, long currentTime) {
801        if (!session.isConnected()) {
802            scheduleRemove(session);
803            return false;
804        }
805
806        final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
807
808        final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
809
810        // Set limitation for the number of written bytes for read-write
811        // fairness. I used maxReadBufferSize * 3 / 2, which yields best
812        // performance in my experience while not breaking fairness much.
813        final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
814                + (session.getConfig().getMaxReadBufferSize() >>> 1);
815        int writtenBytes = 0;
816        WriteRequest req = null;
817
818        try {
819            // Clear OP_WRITE
820            setInterestedInWrite(session, false);
821
822            do {
823                // Check for pending writes.
824                req = session.getCurrentWriteRequest();
825
826                if (req == null) {
827                    req = writeRequestQueue.poll(session);
828
829                    if (req == null) {
830                        break;
831                    }
832
833                    session.setCurrentWriteRequest(req);
834                }
835
836                int localWrittenBytes;
837                Object message = req.getMessage();
838
839                if (message instanceof IoBuffer) {
840                    localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
841                            currentTime);
842
843                    if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
844                        // the buffer isn't empty, we re-interest it in writing
845                        writtenBytes += localWrittenBytes;
846                        setInterestedInWrite(session, true);
847                        return false;
848                    }
849                } else if (message instanceof FileRegion) {
850                    localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
851                            currentTime);
852
853                    // Fix for Java bug on Linux
854                    // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
855                    // If there's still data to be written in the FileRegion,
856                    // return 0 indicating that we need
857                    // to pause until writing may resume.
858                    if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
859                        writtenBytes += localWrittenBytes;
860                        setInterestedInWrite(session, true);
861                        return false;
862                    }
863                } else {
864                    throw new IllegalStateException("Don't know how to handle message of type '"
865                            + message.getClass().getName() + "'.  Are you missing a protocol encoder?");
866                }
867
868                if (localWrittenBytes == 0) {
869
870                    // Kernel buffer is full.
871                    if (!req.equals(AbstractIoSession.MESSAGE_SENT_REQUEST)) {
872                        setInterestedInWrite(session, true);
873                        return false;
874                    }
875                } else {
876                    writtenBytes += localWrittenBytes;
877    
878                    if (writtenBytes >= maxWrittenBytes) {
879                        // Wrote too much
880                        scheduleFlush(session);
881                        return false;
882                    }
883                }
884
885                if (message instanceof IoBuffer) {
886                    ((IoBuffer) message).free();
887                }
888            } while (writtenBytes < maxWrittenBytes);
889        } catch (Exception e) {
890            if (req != null) {
891                req.getFuture().setException(e);
892            }
893
894            IoFilterChain filterChain = session.getFilterChain();
895            filterChain.fireExceptionCaught(e);
896            return false;
897        }
898
899        return true;
900    }
901
902    private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
903            throws Exception {
904        IoBuffer buf = (IoBuffer) req.getMessage();
905        int localWrittenBytes = 0;
906
907        if (buf.hasRemaining()) {
908            int length;
909
910            if (hasFragmentation) {
911                length = Math.min(buf.remaining(), maxLength);
912            } else {
913                length = buf.remaining();
914            }
915
916            try {
917                localWrittenBytes = write(session, buf, length);
918            } catch (IOException ioe) {
919                // We have had an issue while trying to send data to the
920                // peer : let's close the session.
921                buf.free();
922                session.closeNow();
923                removeNow(session);
924
925                return 0;
926            }
927        }
928
929        session.increaseWrittenBytes(localWrittenBytes, currentTime);
930        
931        // Now, forward the original message
932        if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
933            // Buffer has been sent, clear the current request.
934            Object originalMessage = req.getOriginalRequest().getMessage();
935
936            if (originalMessage instanceof IoBuffer) {
937                buf = ((IoBuffer)req.getOriginalRequest().getMessage());
938
939                int pos = buf.position();
940                buf.reset();
941                fireMessageSent(session, req);
942                // And set it back to its position
943                buf.position(pos);
944            } else {
945                fireMessageSent(session, req);
946            }
947        }
948
949        return localWrittenBytes;
950    }
951
952    private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
953            throws Exception {
954        int localWrittenBytes;
955        FileRegion region = (FileRegion) req.getMessage();
956
957        if (region.getRemainingBytes() > 0) {
958            int length;
959
960            if (hasFragmentation) {
961                length = (int) Math.min(region.getRemainingBytes(), maxLength);
962            } else {
963                length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
964            }
965
966            localWrittenBytes = transferFile(session, region, length);
967            region.update(localWrittenBytes);
968        } else {
969            localWrittenBytes = 0;
970        }
971
972        session.increaseWrittenBytes(localWrittenBytes, currentTime);
973
974        if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
975            fireMessageSent(session, req);
976        }
977
978        return localWrittenBytes;
979    }
980
981    private void fireMessageSent(S session, WriteRequest req) {
982        session.setCurrentWriteRequest(null);
983        IoFilterChain filterChain = session.getFilterChain();
984        filterChain.fireMessageSent(req);
985    }
986
987    /**
988     * Update the trafficControl for all the session.
989     */
990    private void updateTrafficMask() {
991        int queueSize = trafficControllingSessions.size();
992
993        while (queueSize > 0) {
994            S session = trafficControllingSessions.poll();
995
996            if (session == null) {
997                // We are done with this queue.
998                return;
999            }
1000
1001            SessionState state = getState(session);
1002
1003            switch (state) {
1004            case OPENED:
1005                updateTrafficControl(session);
1006
1007                break;
1008
1009            case CLOSING:
1010                break;
1011
1012            case OPENING:
1013                // Retry later if session is not yet fully initialized.
1014                // (In case that Session.suspend??() or session.resume??() is
1015                // called before addSession() is processed)
1016                // We just put back the session at the end of the queue.
1017                trafficControllingSessions.add(session);
1018                break;
1019
1020            default:
1021                throw new IllegalStateException(String.valueOf(state));
1022            }
1023
1024            // As we have handled one session, decrement the number of
1025            // remaining sessions. The OPENING session will be processed
1026            // with the next select(), as the queue size has been decreased,
1027            // even
1028            // if the session has been pushed at the end of the queue
1029            queueSize--;
1030        }
1031    }
1032
1033    /**
1034     * {@inheritDoc}
1035     */
1036    @Override
1037    public void updateTrafficControl(S session) {
1038        //
1039        try {
1040            setInterestedInRead(session, !session.isReadSuspended());
1041        } catch (Exception e) {
1042            IoFilterChain filterChain = session.getFilterChain();
1043            filterChain.fireExceptionCaught(e);
1044        }
1045
1046        try {
1047            setInterestedInWrite(session,
1048                    !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended());
1049        } catch (Exception e) {
1050            IoFilterChain filterChain = session.getFilterChain();
1051            filterChain.fireExceptionCaught(e);
1052        }
1053    }
1054
1055    /**
1056     * The main loop. This is the place in charge to poll the Selector, and to
1057     * process the active sessions. It's done in - handle the newly created
1058     * sessions -
1059     */
1060    private class Processor implements Runnable {
1061        public void run() {
1062            assert (processorRef.get() == this);
1063
1064            int nSessions = 0;
1065            lastIdleCheckTime = System.currentTimeMillis();
1066            int nbTries = 10;
1067
1068            for (;;) {
1069                try {
1070                    // This select has a timeout so that we can manage
1071                    // idle session when we get out of the select every
1072                    // second. (note : this is a hack to avoid creating
1073                    // a dedicated thread).
1074                    long t0 = System.currentTimeMillis();
1075                    int selected = select(SELECT_TIMEOUT);
1076                    long t1 = System.currentTimeMillis();
1077                    long delta = t1 - t0;
1078
1079                    if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) {
1080                        // Last chance : the select() may have been
1081                        // interrupted because we have had an closed channel.
1082                        if (isBrokenConnection()) {
1083                            LOG.warn("Broken connection");
1084                        } else {
1085                            // Ok, we are hit by the nasty epoll
1086                            // spinning.
1087                            // Basically, there is a race condition
1088                            // which causes a closing file descriptor not to be
1089                            // considered as available as a selected channel,
1090                            // but
1091                            // it stopped the select. The next time we will
1092                            // call select(), it will exit immediately for the
1093                            // same
1094                            // reason, and do so forever, consuming 100%
1095                            // CPU.
1096                            // We have to destroy the selector, and
1097                            // register all the socket on a new one.
1098                            if (nbTries == 0) {
1099                                LOG.warn("Create a new selector. Selected is 0, delta = " + delta);
1100                                registerNewSelector();
1101                                nbTries = 10;
1102                            } else {
1103                                nbTries--;
1104                            }
1105                        }
1106                    } else {
1107                        nbTries = 10;
1108                    }
1109
1110                    // Manage newly created session first
1111                    nSessions += handleNewSessions();
1112
1113                    updateTrafficMask();
1114
1115                    // Now, if we have had some incoming or outgoing events,
1116                    // deal with them
1117                    if (selected > 0) {
1118                        // LOG.debug("Processing ..."); // This log hurts one of
1119                        // the MDCFilter test...
1120                        process();
1121                    }
1122
1123                    // Write the pending requests
1124                    long currentTime = System.currentTimeMillis();
1125                    flush(currentTime);
1126
1127                    // And manage removed sessions
1128                    nSessions -= removeSessions();
1129
1130                    // Last, not least, send Idle events to the idle sessions
1131                    notifyIdleSessions(currentTime);
1132
1133                    // Get a chance to exit the infinite loop if there are no
1134                    // more sessions on this Processor
1135                    if (nSessions == 0) {
1136                        processorRef.set(null);
1137
1138                        if (newSessions.isEmpty() && isSelectorEmpty()) {
1139                            // newSessions.add() precedes startupProcessor
1140                            assert (processorRef.get() != this);
1141                            break;
1142                        }
1143
1144                        assert (processorRef.get() != this);
1145
1146                        if (!processorRef.compareAndSet(null, this)) {
1147                            // startupProcessor won race, so must exit processor
1148                            assert (processorRef.get() != this);
1149                            break;
1150                        }
1151
1152                        assert (processorRef.get() == this);
1153                    }
1154
1155                    // Disconnect all sessions immediately if disposal has been
1156                    // requested so that we exit this loop eventually.
1157                    if (isDisposing()) {
1158                        boolean hasKeys = false;
1159                        
1160                        for (Iterator<S> i = allSessions(); i.hasNext();) {
1161                            IoSession session = i.next();
1162                            
1163                            if (session.isActive()) {
1164                                scheduleRemove((S)session);
1165                                hasKeys = true;
1166                            }
1167                        }
1168
1169                        if (hasKeys) {
1170                            wakeup();
1171                        }
1172                    }
1173                } catch (ClosedSelectorException cse) {
1174                    // If the selector has been closed, we can exit the loop
1175                    // But first, dump a stack trace
1176                    ExceptionMonitor.getInstance().exceptionCaught(cse);
1177                    break;
1178                } catch (Exception e) {
1179                    ExceptionMonitor.getInstance().exceptionCaught(e);
1180
1181                    try {
1182                        Thread.sleep(1000);
1183                    } catch (InterruptedException e1) {
1184                        ExceptionMonitor.getInstance().exceptionCaught(e1);
1185                    }
1186                }
1187            }
1188
1189            try {
1190                synchronized (disposalLock) {
1191                    if (disposing) {
1192                        doDispose();
1193                    }
1194                }
1195            } catch (Exception e) {
1196                ExceptionMonitor.getInstance().exceptionCaught(e);
1197            } finally {
1198                disposalFuture.setValue(true);
1199            }
1200        }
1201    }
1202}