View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.io.IOException;
23  import java.net.PortUnreachableException;
24  import java.nio.channels.ClosedSelectorException;
25  import java.util.ArrayList;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Queue;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.Executor;
32  import java.util.concurrent.atomic.AtomicBoolean;
33  import java.util.concurrent.atomic.AtomicInteger;
34  import java.util.concurrent.atomic.AtomicReference;
35  
36  import org.apache.mina.core.buffer.IoBuffer;
37  import org.apache.mina.core.file.FileRegion;
38  import org.apache.mina.core.filterchain.IoFilterChain;
39  import org.apache.mina.core.filterchain.IoFilterChainBuilder;
40  import org.apache.mina.core.future.DefaultIoFuture;
41  import org.apache.mina.core.service.AbstractIoService;
42  import org.apache.mina.core.service.IoProcessor;
43  import org.apache.mina.core.service.IoServiceListenerSupport;
44  import org.apache.mina.core.session.AbstractIoSession;
45  import org.apache.mina.core.session.IoSession;
46  import org.apache.mina.core.session.IoSessionConfig;
47  import org.apache.mina.core.session.SessionState;
48  import org.apache.mina.core.write.WriteRequest;
49  import org.apache.mina.core.write.WriteRequestQueue;
50  import org.apache.mina.core.write.WriteToClosedSessionException;
51  import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
52  import org.apache.mina.util.ExceptionMonitor;
53  import org.apache.mina.util.NamePreservingRunnable;
54  import org.slf4j.Logger;
55  import org.slf4j.LoggerFactory;
56  
57  /**
58   * An abstract implementation of {@link IoProcessor} which helps transport
59   * developers to write an {@link IoProcessor} easily. This class is in charge of
60   * active polling a set of {@link IoSession} and trigger events when some I/O
61   * operation is possible.
62   *
63   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
64   *
65   * @param <S> the type of the {@link IoSession} this processor can handle
66   */
67  public abstract class AbstractPollingIoProcessor<S extends AbstractIoSession> implements IoProcessor<S> {
68      /** A logger for this class */
69      private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
70  
71      /**
72       * The maximum loop count for a write operation until
73       * {@link #write(AbstractIoSession, IoBuffer, int)} returns non-zero value.
74       * It is similar to what a spin lock is for in concurrency programming. It
75       * improves memory utilization and write throughput significantly.
76       */
77      private static final int WRITE_SPIN_COUNT = 256;
78  
79      /**
80       * A timeout used for the select, as we need to get out to deal with idle
81       * sessions
82       */
83      private static final long SELECT_TIMEOUT = 1000L;
84  
85      /** A map containing the last Thread ID for each class */
86      private static final ConcurrentHashMap<Class<?>, AtomicInteger> threadIds = new ConcurrentHashMap<Class<?>, AtomicInteger>();
87  
88      /** This IoProcessor instance name */
89      private final String threadName;
90  
91      /** The executor to use when we need to start the inner Processor */
92      private final Executor executor;
93  
94      /** A Session queue containing the newly created sessions */
95      private final Queue<S> newSessions = new ConcurrentLinkedQueue<S>();
96  
97      /** A queue used to store the sessions to be removed */
98      private final Queue<S> removingSessions = new ConcurrentLinkedQueue<S>();
99  
100     /** A queue used to store the sessions to be flushed */
101     private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<S>();
102 
103     /**
104      * A queue used to store the sessions which have a trafficControl to be
105      * updated
106      */
107     private final Queue<S> trafficControllingSessions = new ConcurrentLinkedQueue<S>();
108 
109     /** The processor thread : it handles the incoming messages */
110     private final AtomicReference<Processor> processorRef = new AtomicReference<Processor>();
111 
112     private long lastIdleCheckTime;
113 
114     private final Object disposalLock = new Object();
115 
116     private volatile boolean disposing;
117 
118     private volatile boolean disposed;
119 
120     private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
121 
122     protected AtomicBoolean wakeupCalled = new AtomicBoolean(false);
123 
124     /**
125      * Create an {@link AbstractPollingIoProcessor} with the given
126      * {@link Executor} for handling I/Os events.
127      *
128      * @param executor
129      *            the {@link Executor} for handling I/O events
130      */
131     protected AbstractPollingIoProcessor(Executor executor) {
132         if (executor == null) {
133             throw new IllegalArgumentException("executor");
134         }
135 
136         this.threadName = nextThreadName();
137         this.executor = executor;
138     }
139 
140     /**
141      * Compute the thread ID for this class instance. As we may have different
142      * classes, we store the last ID number into a Map associating the class
143      * name to the last assigned ID.
144      *
145      * @return a name for the current thread, based on the class name and an
146      *         incremental value, starting at 1.
147      */
148     private String nextThreadName() {
149         Class<?> cls = getClass();
150         int newThreadId;
151 
152         AtomicInteger threadId = threadIds.putIfAbsent(cls, new AtomicInteger(1));
153 
154         if (threadId == null) {
155             newThreadId = 1;
156         } else {
157             // Just increment the last ID, and get it.
158             newThreadId = threadId.incrementAndGet();
159         }
160 
161         // Now we can compute the name for this thread
162         return cls.getSimpleName() + '-' + newThreadId;
163     }
164 
165     /**
166      * {@inheritDoc}
167      */
168     public final boolean isDisposing() {
169         return disposing;
170     }
171 
172     /**
173      * {@inheritDoc}
174      */
175     public final boolean isDisposed() {
176         return disposed;
177     }
178 
179     /**
180      * {@inheritDoc}
181      */
182     public final void dispose() {
183         if (disposed || disposing) {
184             return;
185         }
186 
187         synchronized (disposalLock) {
188             disposing = true;
189             startupProcessor();
190         }
191 
192         disposalFuture.awaitUninterruptibly();
193         disposed = true;
194     }
195 
196     /**
197      * Dispose the resources used by this {@link IoProcessor} for polling the
198      * client connections. The implementing class doDispose method will be called.
199      *
200      * @throws Exception if some low level IO error occurs
201      */
202     protected abstract void doDispose() throws Exception;
203 
204     /**
205      * poll those sessions for the given timeout
206      *
207      * @param timeout
208      *            milliseconds before the call timeout if no event appear
209      * @return The number of session ready for read or for write
210      * @throws Exception
211      *             if some low level IO error occurs
212      */
213     protected abstract int select(long timeout) throws Exception;
214 
215     /**
216      * poll those sessions forever
217      *
218      * @return The number of session ready for read or for write
219      * @throws Exception
220      *             if some low level IO error occurs
221      */
222     protected abstract int select() throws Exception;
223 
224     /**
225      * Say if the list of {@link IoSession} polled by this {@link IoProcessor}
226      * is empty
227      *
228      * @return true if at least a session is managed by this {@link IoProcessor}
229      */
230     protected abstract boolean isSelectorEmpty();
231 
232     /**
233      * Interrupt the {@link #select(long)} call.
234      */
235     protected abstract void wakeup();
236 
237     /**
238      * Get an {@link Iterator} for the list of {@link IoSession} polled by this
239      * {@link IoProcessor}
240      *
241      * @return {@link Iterator} of {@link IoSession}
242      */
243     protected abstract Iterator<S> allSessions();
244 
245     /**
246      * Get an {@link Iterator} for the list of {@link IoSession} found selected
247      * by the last call of {@link #select(long)}
248      * 
249      * @return {@link Iterator} of {@link IoSession} read for I/Os operation
250      */
251     protected abstract Iterator<S> selectedSessions();
252 
253     /**
254      * Get the state of a session (preparing, open, closed)
255      *
256      * @param session
257      *            the {@link IoSession} to inspect
258      * @return the state of the session
259      */
260     protected abstract SessionState getState(S session);
261 
262     /**
263      * Is the session ready for writing
264      *
265      * @param session
266      *            the session queried
267      * @return true is ready, false if not ready
268      */
269     protected abstract boolean isWritable(S session);
270 
271     /**
272      * Is the session ready for reading
273      *
274      * @param session
275      *            the session queried
276      * @return true is ready, false if not ready
277      */
278     protected abstract boolean isReadable(S session);
279 
280     /**
281      * register a session for writing
282      *
283      * @param session
284      *            the session registered
285      * @param isInterested
286      *            true for registering, false for removing
287      */
288     protected abstract void setInterestedInWrite(S session, boolean isInterested) throws Exception;
289 
290     /**
291      * register a session for reading
292      *
293      * @param session
294      *            the session registered
295      * @param isInterested
296      *            true for registering, false for removing
297      */
298     protected abstract void setInterestedInRead(S session, boolean isInterested) throws Exception;
299 
300     /**
301      * is this session registered for reading
302      *
303      * @param session
304      *            the session queried
305      * @return true is registered for reading
306      */
307     protected abstract boolean isInterestedInRead(S session);
308 
309     /**
310      * is this session registered for writing
311      *
312      * @param session
313      *            the session queried
314      * @return true is registered for writing
315      */
316     protected abstract boolean isInterestedInWrite(S session);
317 
318     /**
319      * Initialize the polling of a session. Add it to the polling process.
320      *
321      * @param session the {@link IoSession} to add to the polling
322      * @throws Exception any exception thrown by the underlying system calls
323      */
324     protected abstract void init(S session) throws Exception;
325 
326     /**
327      * Destroy the underlying client socket handle
328      *
329      * @param session
330      *            the {@link IoSession}
331      * @throws Exception
332      *             any exception thrown by the underlying system calls
333      */
334     protected abstract void destroy(S session) throws Exception;
335 
336     /**
337      * Reads a sequence of bytes from a {@link IoSession} into the given
338      * {@link IoBuffer}. Is called when the session was found ready for reading.
339      *
340      * @param session
341      *            the session to read
342      * @param buf
343      *            the buffer to fill
344      * @return the number of bytes read
345      * @throws Exception
346      *             any exception thrown by the underlying system calls
347      */
348     protected abstract int read(S session, IoBuffer buf) throws Exception;
349 
350     /**
351      * Write a sequence of bytes to a {@link IoSession}, means to be called when
352      * a session was found ready for writing.
353      *
354      * @param session
355      *            the session to write
356      * @param buf
357      *            the buffer to write
358      * @param length
359      *            the number of bytes to write can be superior to the number of
360      *            bytes remaining in the buffer
361      * @return the number of byte written
362      * @throws Exception
363      *             any exception thrown by the underlying system calls
364      */
365     protected abstract int write(S session, IoBuffer buf, int length) throws Exception;
366 
367     /**
368      * Write a part of a file to a {@link IoSession}, if the underlying API
369      * isn't supporting system calls like sendfile(), you can throw a
370      * {@link UnsupportedOperationException} so the file will be send using
371      * usual {@link #write(AbstractIoSession, IoBuffer, int)} call.
372      *
373      * @param session
374      *            the session to write
375      * @param region
376      *            the file region to write
377      * @param length
378      *            the length of the portion to send
379      * @return the number of written bytes
380      * @throws Exception
381      *             any exception thrown by the underlying system calls
382      */
383     protected abstract int transferFile(S session, FileRegion region, int length) throws Exception;
384 
385     /**
386      * {@inheritDoc}
387      */
388     public final void add(S session) {
389         if (disposed || disposing) {
390             throw new IllegalStateException("Already disposed.");
391         }
392 
393         // Adds the session to the newSession queue and starts the worker
394         newSessions.add(session);
395         startupProcessor();
396     }
397 
398     /**
399      * {@inheritDoc}
400      */
401     public final void remove(S session) {
402         scheduleRemove(session);
403         startupProcessor();
404     }
405 
406     private void scheduleRemove(S session) {
407         removingSessions.add(session);
408     }
409 
410     /**
411      * {@inheritDoc}
412      */
413     public void write(S session, WriteRequest writeRequest) {
414         WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
415 
416         writeRequestQueue.offer(session, writeRequest);
417 
418         if (!session.isWriteSuspended()) {
419             this.flush(session);
420         }
421     }
422 
423     /**
424      * {@inheritDoc}
425      */
426     public final void flush(S session) {
427         // add the session to the queue if it's not already
428         // in the queue, then wake up the select()
429         if (session.setScheduledForFlush(true)) {
430             flushingSessions.add(session);
431             wakeup();
432         }
433     }
434 
435     private void scheduleFlush(S session) {
436         // add the session to the queue if it's not already
437         // in the queue
438         if (session.setScheduledForFlush(true)) {
439             flushingSessions.add(session);
440         }
441     }
442 
443     /**
444      * Updates the traffic mask for a given session
445      * 
446      * @param session
447      *            the session to update
448      */
449     public final void updateTrafficMask(S session) {
450         trafficControllingSessions.add(session);
451         wakeup();
452     }
453 
454     /**
455      * Starts the inner Processor, asking the executor to pick a thread in its
456      * pool. The Runnable will be renamed
457      */
458     private void startupProcessor() {
459         Processor processor = processorRef.get();
460 
461         if (processor == null) {
462             processor = new Processor();
463 
464             if (processorRef.compareAndSet(null, processor)) {
465                 executor.execute(new NamePreservingRunnable(processor, threadName));
466             }
467         }
468 
469         // Just stop the select() and start it again, so that the processor
470         // can be activated immediately.
471         wakeup();
472     }
473 
474     /**
475      * In the case we are using the java select() method, this method is used to
476      * trash the buggy selector and create a new one, registring all the sockets
477      * on it.
478      *
479      * @throws IOException
480      *             If we got an exception
481      */
482     abstract protected void registerNewSelector() throws IOException;
483 
484     /**
485      * Check that the select() has not exited immediately just because of a
486      * broken connection. In this case, this is a standard case, and we just
487      * have to loop.
488      *
489      * @return true if a connection has been brutally closed.
490      * @throws IOException
491      *             If we got an exception
492      */
493     abstract protected boolean isBrokenConnection() throws IOException;
494 
495     /**
496      * Loops over the new sessions blocking queue and returns the number of
497      * sessions which are effectively created
498      *
499      * @return The number of new sessions
500      */
501     private int handleNewSessions() {
502         int addedSessions = 0;
503 
504         for (S session = newSessions.poll(); session != null; session = newSessions.poll()) {
505             if (addNow(session)) {
506                 // A new session has been created
507                 addedSessions++;
508             }
509         }
510 
511         return addedSessions;
512     }
513 
514     /**
515      * Process a new session :
516      * - initialize it
517      * - create its chain
518      * - fire the CREATED listeners if any
519      *
520      * @param session The session to create
521      * @return true if the session has been registered
522      */
523     private boolean addNow(S session) {
524         boolean registered = false;
525 
526         try {
527             init(session);
528             registered = true;
529 
530             // Build the filter chain of this session.
531             IoFilterChainBuilder chainBuilder = session.getService().getFilterChainBuilder();
532             chainBuilder.buildFilterChain(session.getFilterChain());
533 
534             // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
535             // in AbstractIoFilterChain.fireSessionOpened().
536             // Propagate the SESSION_CREATED event up to the chain
537             IoServiceListenerSupport listeners = ((AbstractIoService) session.getService()).getListeners();
538             listeners.fireSessionCreated(session);
539         } catch (Exception e) {
540             ExceptionMonitor.getInstance().exceptionCaught(e);
541 
542             try {
543                 destroy(session);
544             } catch (Exception e1) {
545                 ExceptionMonitor.getInstance().exceptionCaught(e1);
546             } finally {
547                 registered = false;
548             }
549         }
550 
551         return registered;
552     }
553 
554     private int removeSessions() {
555         int removedSessions = 0;
556 
557         for (S session = removingSessions.poll(); session != null; session = removingSessions.poll()) {
558             SessionState state = getState(session);
559 
560             // Now deal with the removal accordingly to the session's state
561             switch (state) {
562             case OPENED:
563                 // Try to remove this session
564                 if (removeNow(session)) {
565                     removedSessions++;
566                 }
567 
568                 break;
569 
570             case CLOSING:
571                 // Skip if channel is already closed
572                 break;
573 
574             case OPENING:
575                 // Remove session from the newSessions queue and
576                 // remove it
577                 newSessions.remove(session);
578 
579                 if (removeNow(session)) {
580                     removedSessions++;
581                 }
582 
583                 break;
584 
585             default:
586                 throw new IllegalStateException(String.valueOf(state));
587             }
588         }
589 
590         return removedSessions;
591     }
592 
593     private boolean removeNow(S session) {
594         clearWriteRequestQueue(session);
595 
596         try {
597             destroy(session);
598             return true;
599         } catch (Exception e) {
600             IoFilterChain filterChain = session.getFilterChain();
601             filterChain.fireExceptionCaught(e);
602         } finally {
603             clearWriteRequestQueue(session);
604             ((AbstractIoService) session.getService()).getListeners().fireSessionDestroyed(session);
605         }
606         return false;
607     }
608 
609     private void clearWriteRequestQueue(S session) {
610         WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
611         WriteRequest req;
612 
613         List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
614 
615         if ((req = writeRequestQueue.poll(session)) != null) {
616             Object message = req.getMessage();
617 
618             if (message instanceof IoBuffer) {
619                 IoBuffer buf = (IoBuffer) message;
620 
621                 // The first unwritten empty buffer must be
622                 // forwarded to the filter chain.
623                 if (buf.hasRemaining()) {
624                     buf.reset();
625                     failedRequests.add(req);
626                 } else {
627                     IoFilterChain filterChain = session.getFilterChain();
628                     filterChain.fireMessageSent(req);
629                 }
630             } else {
631                 failedRequests.add(req);
632             }
633 
634             // Discard others.
635             while ((req = writeRequestQueue.poll(session)) != null) {
636                 failedRequests.add(req);
637             }
638         }
639 
640         // Create an exception and notify.
641         if (!failedRequests.isEmpty()) {
642             WriteToClosedSessionException cause = new WriteToClosedSessionException(failedRequests);
643 
644             for (WriteRequest r : failedRequests) {
645                 session.decreaseScheduledBytesAndMessages(r);
646                 r.getFuture().setException(cause);
647             }
648 
649             IoFilterChain filterChain = session.getFilterChain();
650             filterChain.fireExceptionCaught(cause);
651         }
652     }
653 
654     private void process() throws Exception {
655         for (Iterator<S> i = selectedSessions(); i.hasNext();) {
656             S session = i.next();
657             process(session);
658             i.remove();
659         }
660     }
661 
662     /**
663      * Deal with session ready for the read or write operations, or both.
664      */
665     private void process(S session) {
666         // Process Reads
667         if (isReadable(session) && !session.isReadSuspended()) {
668             read(session);
669         }
670 
671         // Process writes
672         if (isWritable(session) && !session.isWriteSuspended()) {
673             // add the session to the queue, if it's not already there
674             if (session.setScheduledForFlush(true)) {
675                 flushingSessions.add(session);
676             }
677         }
678     }
679 
680     private void read(S session) {
681         IoSessionConfig config = session.getConfig();
682         int bufferSize = config.getReadBufferSize();
683         IoBuffer buf = IoBuffer.allocate(bufferSize);
684 
685         final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
686 
687         try {
688             int readBytes = 0;
689             int ret;
690 
691             try {
692                 if (hasFragmentation) {
693 
694                     while ((ret = read(session, buf)) > 0) {
695                         readBytes += ret;
696 
697                         if (!buf.hasRemaining()) {
698                             break;
699                         }
700                     }
701                 } else {
702                     ret = read(session, buf);
703 
704                     if (ret > 0) {
705                         readBytes = ret;
706                     }
707                 }
708             } finally {
709                 buf.flip();
710             }
711 
712             if (readBytes > 0) {
713                 IoFilterChain filterChain = session.getFilterChain();
714                 filterChain.fireMessageReceived(buf);
715                 buf = null;
716 
717                 if (hasFragmentation) {
718                     if (readBytes << 1 < config.getReadBufferSize()) {
719                         session.decreaseReadBufferSize();
720                     } else if (readBytes == config.getReadBufferSize()) {
721                         session.increaseReadBufferSize();
722                     }
723                 }
724             }
725 
726             if (ret < 0) {
727                 // scheduleRemove(session);
728                 IoFilterChain filterChain = session.getFilterChain();
729                 filterChain.fireInputClosed();
730             }
731         } catch (Exception e) {
732             if (e instanceof IOException) {
733                 if (!(e instanceof PortUnreachableException)
734                         || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
735                         || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
736                     scheduleRemove(session);
737                 }
738             }
739 
740             IoFilterChain filterChain = session.getFilterChain();
741             filterChain.fireExceptionCaught(e);
742         }
743     }
744 
745     private void notifyIdleSessions(long currentTime) throws Exception {
746         // process idle sessions
747         if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
748             lastIdleCheckTime = currentTime;
749             AbstractIoSession.notifyIdleness(allSessions(), currentTime);
750         }
751     }
752 
753     /**
754      * Write all the pending messages
755      */
756     private void flush(long currentTime) {
757         if (flushingSessions.isEmpty()) {
758             return;
759         }
760 
761         do {
762             S session = flushingSessions.poll(); // the same one with firstSession
763 
764             if (session == null) {
765                 // Just in case ... It should not happen.
766                 break;
767             }
768 
769             // Reset the Schedule for flush flag for this session,
770             // as we are flushing it now
771             session.unscheduledForFlush();
772 
773             SessionState state = getState(session);
774 
775             switch (state) {
776             case OPENED:
777                 try {
778                     boolean flushedAll = flushNow(session, currentTime);
779 
780                     if (flushedAll && !session.getWriteRequestQueue().isEmpty(session)
781                             && !session.isScheduledForFlush()) {
782                         scheduleFlush(session);
783                     }
784                 } catch (Exception e) {
785                     scheduleRemove(session);
786                     IoFilterChain filterChain = session.getFilterChain();
787                     filterChain.fireExceptionCaught(e);
788                 }
789 
790                 break;
791 
792             case CLOSING:
793                 // Skip if the channel is already closed.
794                 break;
795 
796             case OPENING:
797                 // Retry later if session is not yet fully initialized.
798                 // (In case that Session.write() is called before addSession()
799                 // is processed)
800                 scheduleFlush(session);
801                 return;
802 
803             default:
804                 throw new IllegalStateException(String.valueOf(state));
805             }
806 
807         } while (!flushingSessions.isEmpty());
808     }
809 
810     private boolean flushNow(S session, long currentTime) {
811         if (!session.isConnected()) {
812             scheduleRemove(session);
813             return false;
814         }
815 
816         final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
817 
818         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
819 
820         // Set limitation for the number of written bytes for read-write
821         // fairness. I used maxReadBufferSize * 3 / 2, which yields best
822         // performance in my experience while not breaking fairness much.
823         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
824                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
825         int writtenBytes = 0;
826         WriteRequest req = null;
827 
828         try {
829             // Clear OP_WRITE
830             setInterestedInWrite(session, false);
831 
832             do {
833                 // Check for pending writes.
834                 req = session.getCurrentWriteRequest();
835 
836                 if (req == null) {
837                     req = writeRequestQueue.poll(session);
838 
839                     if (req == null) {
840                         break;
841                     }
842 
843                     session.setCurrentWriteRequest(req);
844                 }
845 
846                 int localWrittenBytes = 0;
847                 Object message = req.getMessage();
848 
849                 if (message instanceof IoBuffer) {
850                     localWrittenBytes = writeBuffer(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
851                             currentTime);
852 
853                     if ((localWrittenBytes > 0) && ((IoBuffer) message).hasRemaining()) {
854                         // the buffer isn't empty, we re-interest it in writing
855                         writtenBytes += localWrittenBytes;
856                         setInterestedInWrite(session, true);
857                         return false;
858                     }
859                 } else if (message instanceof FileRegion) {
860                     localWrittenBytes = writeFile(session, req, hasFragmentation, maxWrittenBytes - writtenBytes,
861                             currentTime);
862 
863                     // Fix for Java bug on Linux
864                     // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
865                     // If there's still data to be written in the FileRegion,
866                     // return 0 indicating that we need
867                     // to pause until writing may resume.
868                     if ((localWrittenBytes > 0) && (((FileRegion) message).getRemainingBytes() > 0)) {
869                         writtenBytes += localWrittenBytes;
870                         setInterestedInWrite(session, true);
871                         return false;
872                     }
873                 } else {
874                     throw new IllegalStateException("Don't know how to handle message of type '"
875                             + message.getClass().getName() + "'.  Are you missing a protocol encoder?");
876                 }
877 
878                 if (localWrittenBytes == 0) {
879                     // Kernel buffer is full.
880                     setInterestedInWrite(session, true);
881                     return false;
882                 }
883 
884                 writtenBytes += localWrittenBytes;
885 
886                 if (writtenBytes >= maxWrittenBytes) {
887                     // Wrote too much
888                     scheduleFlush(session);
889                     return false;
890                 }
891 
892                 if (message instanceof IoBuffer) {
893                     ((IoBuffer) message).free();
894                 }
895             } while (writtenBytes < maxWrittenBytes);
896         } catch (Exception e) {
897             if (req != null) {
898                 req.getFuture().setException(e);
899             }
900 
901             IoFilterChain filterChain = session.getFilterChain();
902             filterChain.fireExceptionCaught(e);
903             return false;
904         }
905 
906         return true;
907     }
908 
909     private int writeBuffer(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
910             throws Exception {
911         IoBuffer buf = (IoBuffer) req.getMessage();
912         int localWrittenBytes = 0;
913 
914         if (buf.hasRemaining()) {
915             int length;
916 
917             if (hasFragmentation) {
918                 length = Math.min(buf.remaining(), maxLength);
919             } else {
920                 length = buf.remaining();
921             }
922 
923             try {
924                 localWrittenBytes = write(session, buf, length);
925             } catch (IOException ioe) {
926                 // We have had an issue while trying to send data to the
927                 // peer : let's close the session.
928                 buf.free();
929                 session.close(true);
930                 destroy(session);
931 
932                 return 0;
933             }
934 
935         }
936 
937         session.increaseWrittenBytes(localWrittenBytes, currentTime);
938 
939         if (!buf.hasRemaining() || (!hasFragmentation && (localWrittenBytes != 0))) {
940             // Buffer has been sent, clear the current request.
941             int pos = buf.position();
942             buf.reset();
943 
944             fireMessageSent(session, req);
945 
946             // And set it back to its position
947             buf.position(pos);
948         }
949 
950         return localWrittenBytes;
951     }
952 
953     private int writeFile(S session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)
954             throws Exception {
955         int localWrittenBytes;
956         FileRegion region = (FileRegion) req.getMessage();
957 
958         if (region.getRemainingBytes() > 0) {
959             int length;
960 
961             if (hasFragmentation) {
962                 length = (int) Math.min(region.getRemainingBytes(), maxLength);
963             } else {
964                 length = (int) Math.min(Integer.MAX_VALUE, region.getRemainingBytes());
965             }
966 
967             localWrittenBytes = transferFile(session, region, length);
968             region.update(localWrittenBytes);
969         } else {
970             localWrittenBytes = 0;
971         }
972 
973         session.increaseWrittenBytes(localWrittenBytes, currentTime);
974 
975         if ((region.getRemainingBytes() <= 0) || (!hasFragmentation && (localWrittenBytes != 0))) {
976             fireMessageSent(session, req);
977         }
978 
979         return localWrittenBytes;
980     }
981 
982     private void fireMessageSent(S session, WriteRequest req) {
983         session.setCurrentWriteRequest(null);
984         IoFilterChain filterChain = session.getFilterChain();
985         filterChain.fireMessageSent(req);
986     }
987 
988     /**
989      * Update the trafficControl for all the session.
990      */
991     private void updateTrafficMask() {
992         int queueSize = trafficControllingSessions.size();
993 
994         while (queueSize > 0) {
995             S session = trafficControllingSessions.poll();
996 
997             if (session == null) {
998                 // We are done with this queue.
999                 return;
1000             }
1001 
1002             SessionState state = getState(session);
1003 
1004             switch (state) {
1005             case OPENED:
1006                 updateTrafficControl(session);
1007 
1008                 break;
1009 
1010             case CLOSING:
1011                 break;
1012 
1013             case OPENING:
1014                 // Retry later if session is not yet fully initialized.
1015                 // (In case that Session.suspend??() or session.resume??() is
1016                 // called before addSession() is processed)
1017                 // We just put back the session at the end of the queue.
1018                 trafficControllingSessions.add(session);
1019                 break;
1020 
1021             default:
1022                 throw new IllegalStateException(String.valueOf(state));
1023             }
1024 
1025             // As we have handled one session, decrement the number of
1026             // remaining sessions. The OPENING session will be processed
1027             // with the next select(), as the queue size has been decreased, even
1028             // if the session has been pushed at the end of the queue
1029             queueSize--;
1030         }
1031     }
1032 
1033     /**
1034      * {@inheritDoc}
1035      */
1036     public void updateTrafficControl(S session) {
1037         //
1038         try {
1039             setInterestedInRead(session, !session.isReadSuspended());
1040         } catch (Exception e) {
1041             IoFilterChain filterChain = session.getFilterChain();
1042             filterChain.fireExceptionCaught(e);
1043         }
1044 
1045         try {
1046             setInterestedInWrite(session,
1047                     !session.getWriteRequestQueue().isEmpty(session) && !session.isWriteSuspended());
1048         } catch (Exception e) {
1049             IoFilterChain filterChain = session.getFilterChain();
1050             filterChain.fireExceptionCaught(e);
1051         }
1052     }
1053 
1054     /**
1055      * The main loop. This is the place in charge to poll the Selector, and to
1056      * process the active sessions. It's done in
1057      * - handle the newly created sessions
1058      * -
1059      */
1060     private class Processor implements Runnable {
1061         public void run() {
1062             assert (processorRef.get() == this);
1063 
1064             int nSessions = 0;
1065             lastIdleCheckTime = System.currentTimeMillis();
1066 
1067             for (;;) {
1068                 try {
1069                     // This select has a timeout so that we can manage
1070                     // idle session when we get out of the select every
1071                     // second. (note : this is a hack to avoid creating
1072                     // a dedicated thread).
1073                     long t0 = System.currentTimeMillis();
1074                     int selected = select(SELECT_TIMEOUT);
1075                     long t1 = System.currentTimeMillis();
1076                     long delta = (t1 - t0);
1077 
1078                     if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {
1079                         // Last chance : the select() may have been
1080                         // interrupted because we have had an closed channel.
1081                         if (isBrokenConnection()) {
1082                             LOG.warn("Broken connection");
1083 
1084                             // we can reselect immediately
1085                             // set back the flag to false
1086                             wakeupCalled.getAndSet(false);
1087 
1088                             continue;
1089                         } else {
1090                             LOG.warn("Create a new selector. Selected is 0, delta = " + (t1 - t0));
1091                             // Ok, we are hit by the nasty epoll
1092                             // spinning.
1093                             // Basically, there is a race condition
1094                             // which causes a closing file descriptor not to be
1095                             // considered as available as a selected channel, but
1096                             // it stopped the select. The next time we will
1097                             // call select(), it will exit immediately for the same
1098                             // reason, and do so forever, consuming 100%
1099                             // CPU.
1100                             // We have to destroy the selector, and
1101                             // register all the socket on a new one.
1102                             registerNewSelector();
1103                         }
1104 
1105                         // Set back the flag to false
1106                         wakeupCalled.getAndSet(false);
1107 
1108                         // and continue the loop
1109                         continue;
1110                     }
1111 
1112                     // Manage newly created session first
1113                     nSessions += handleNewSessions();
1114 
1115                     updateTrafficMask();
1116 
1117                     // Now, if we have had some incoming or outgoing events,
1118                     // deal with them
1119                     if (selected > 0) {
1120                         //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...
1121                         process();
1122                     }
1123 
1124                     // Write the pending requests
1125                     long currentTime = System.currentTimeMillis();
1126                     flush(currentTime);
1127 
1128                     // And manage removed sessions
1129                     nSessions -= removeSessions();
1130 
1131                     // Last, not least, send Idle events to the idle sessions
1132                     notifyIdleSessions(currentTime);
1133 
1134                     // Get a chance to exit the infinite loop if there are no
1135                     // more sessions on this Processor
1136                     if (nSessions == 0) {
1137                         processorRef.set(null);
1138 
1139                         if (newSessions.isEmpty() && isSelectorEmpty()) {
1140                             // newSessions.add() precedes startupProcessor
1141                             assert (processorRef.get() != this);
1142                             break;
1143                         }
1144 
1145                         assert (processorRef.get() != this);
1146 
1147                         if (!processorRef.compareAndSet(null, this)) {
1148                             // startupProcessor won race, so must exit processor
1149                             assert (processorRef.get() != this);
1150                             break;
1151                         }
1152 
1153                         assert (processorRef.get() == this);
1154                     }
1155 
1156                     // Disconnect all sessions immediately if disposal has been
1157                     // requested so that we exit this loop eventually.
1158                     if (isDisposing()) {
1159                         for (Iterator<S> i = allSessions(); i.hasNext();) {
1160                             scheduleRemove(i.next());
1161                         }
1162 
1163                         wakeup();
1164                     }
1165                 } catch (ClosedSelectorException cse) {
1166                     // If the selector has been closed, we can exit the loop
1167                     // But first, dump a stack trace
1168                     ExceptionMonitor.getInstance().exceptionCaught(cse);
1169                     break;
1170                 } catch (Exception e) {
1171                     ExceptionMonitor.getInstance().exceptionCaught(e);
1172 
1173                     try {
1174                         Thread.sleep(1000);
1175                     } catch (InterruptedException e1) {
1176                         ExceptionMonitor.getInstance().exceptionCaught(e1);
1177                     }
1178                 }
1179             }
1180 
1181             try {
1182                 synchronized (disposalLock) {
1183                     if (disposing) {
1184                         doDispose();
1185                     }
1186                 }
1187             } catch (Exception e) {
1188                 ExceptionMonitor.getInstance().exceptionCaught(e);
1189             } finally {
1190                 disposalFuture.setValue(true);
1191             }
1192         }
1193     }
1194 }