View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.polling;
21  
22  import java.io.IOException;
23  import java.net.PortUnreachableException;
24  import java.util.ArrayList;
25  import java.util.HashMap;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.Queue;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.Executor;
32  import java.util.concurrent.atomic.AtomicInteger;
33  
34  import org.apache.mina.core.buffer.IoBuffer;
35  import org.apache.mina.core.file.FileRegion;
36  import org.apache.mina.core.filterchain.IoFilterChain;
37  import org.apache.mina.core.future.DefaultIoFuture;
38  import org.apache.mina.core.service.AbstractIoService;
39  import org.apache.mina.core.service.IoProcessor;
40  import org.apache.mina.core.session.AbstractIoSession;
41  import org.apache.mina.core.session.IoSession;
42  import org.apache.mina.core.session.IoSessionConfig;
43  import org.apache.mina.core.session.SessionState;
44  import org.apache.mina.core.write.WriteRequest;
45  import org.apache.mina.core.write.WriteRequestQueue;
46  import org.apache.mina.core.write.WriteToClosedSessionException;
47  import org.apache.mina.transport.socket.AbstractDatagramSessionConfig;
48  import org.apache.mina.util.ExceptionMonitor;
49  import org.apache.mina.util.NamePreservingRunnable;
50  import org.slf4j.Logger;
51  import org.slf4j.LoggerFactory;
52  
53  /**
54   * An abstract implementation of {@link IoProcessor} which helps
55   * transport developers to write an {@link IoProcessor} easily.
56   * This class is in charge of active polling a set of {@link IoSession}
57   * and trigger events when some I/O operation is possible.
58   *
59   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
60   */
61  public abstract class AbstractPollingIoProcessor<T extends AbstractIoSession>
62          implements IoProcessor<T> {
63      /** A logger for this class */
64      private final static Logger LOG = LoggerFactory.getLogger(IoProcessor.class);
65      
66      /**
67       * The maximum loop count for a write operation until
68       * {@link #write(AbstractIoSession, IoBuffer, int)} returns non-zero value.
69       * It is similar to what a spin lock is for in concurrency programming.
70       * It improves memory utilization and write throughput significantly.
71       */
72      private static final int WRITE_SPIN_COUNT = 256;
73  
74      /** A timeout used for the select, as we need to get out to deal with idle sessions */
75      private static final long SELECT_TIMEOUT = 1000L;
76  
77      /** A map containing the last Thread ID for each class */
78      private static final Map<Class<?>, AtomicInteger> threadIds = new HashMap<Class<?>, AtomicInteger>();
79  
80      private final Object lock = new Object();
81  
82      private final String threadName;
83  
84      private final Executor executor;
85  
86      /** A Session queue containing the newly created sessions */
87      private final Queue<T> newSessions = new ConcurrentLinkedQueue<T>();
88  
89      /** A queue used to store the sessions to be removed */
90      private final Queue<T> removingSessions = new ConcurrentLinkedQueue<T>();
91  
92      /** A queue used to store the sessions to be flushed */
93      private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
94  
95      /** A queue used to store the sessions which have a trafficControl to be updated */
96      private final Queue<T> trafficControllingSessions = new ConcurrentLinkedQueue<T>();
97  
98      /** The processor thread : it handles the incoming messages */
99      private Processor processor;
100 
101     private long lastIdleCheckTime;
102 
103     private final Object disposalLock = new Object();
104 
105     private volatile boolean disposing;
106 
107     private volatile boolean disposed;
108 
109     private final DefaultIoFuture disposalFuture = new DefaultIoFuture(null);
110 
111     /**
112      * Create an {@link AbstractPollingIoProcessor} with the given {@link Executor}
113      * for handling I/Os events.
114      * 
115      * @param executor the {@link Executor} for handling I/O events
116      */
117     protected AbstractPollingIoProcessor(Executor executor) {
118         if (executor == null) {
119             throw new NullPointerException("executor");
120         }
121 
122         this.threadName = nextThreadName();
123         this.executor = executor;
124     }
125 
126     /**
127      * Compute the thread ID for this class instance. As we may have different
128      * classes, we store the last ID number into a Map associating the class
129      * name to the last assigned ID.
130      *   
131      * @return a name for the current thread, based on the class name and
132      * an incremental value, starting at 1. 
133      */
134     private String nextThreadName() {
135         Class<?> cls = getClass();
136         int newThreadId;
137 
138         // We synchronize this block to avoid a concurrent access to 
139         // the actomicInteger (it can be modified by another thread, while
140         // being seen as null by another thread)
141         synchronized (threadIds) {
142             // Get the current ID associated to this class' name
143             AtomicInteger threadId = threadIds.get(cls);
144 
145             if (threadId == null) {
146                 // We never have seen this class before, just create a
147                 // new ID starting at 1 for it, and associate this ID
148                 // with the class name in the map.
149                 newThreadId = 1;
150                 threadIds.put(cls, new AtomicInteger(newThreadId));
151             } else {
152                 // Just increment the lat ID, and get it.
153                 newThreadId = threadId.incrementAndGet();
154             }
155         }
156 
157         // Now we can compute the name for this thread
158         return cls.getSimpleName() + '-' + newThreadId;
159     }
160 
161     /**
162      * {@inheritDoc}
163      */
164     public final boolean isDisposing() {
165         return disposing;
166     }
167 
168     /**
169      * {@inheritDoc}
170      */
171     public final boolean isDisposed() {
172         return disposed;
173     }
174 
175     /**
176      * {@inheritDoc}
177      */
178     public final void dispose() {
179         if (disposed) {
180             return;
181         }
182 
183         synchronized (disposalLock) {
184             if (!disposing) {
185                 disposing = true;
186                 startupProcessor();
187             }
188         }
189 
190         disposalFuture.awaitUninterruptibly();
191         disposed = true;
192     }
193 
194     /**
195      * Dispose the resources used by this {@link IoProcessor} for polling 
196      * the client connections
197      * @throws Exception if some low level IO error occurs
198      */
199     protected abstract void dispose0() throws Exception;
200 
201     /**
202      * poll those sessions for the given timeout
203      * @param timeout milliseconds before the call timeout if no event appear
204      * @return The number of session ready for read or for write
205      * @throws Exception if some low level IO error occurs
206      */
207     protected abstract int select(long timeout) throws Exception;
208 
209     /**
210      * poll those sessions forever
211      * @return The number of session ready for read or for write
212      * @throws Exception if some low level IO error occurs
213      */
214     protected abstract int select() throws Exception;
215 
216     /**
217      * Say if the list of {@link IoSession} polled by this {@link IoProcessor} 
218      * is empty
219      * @return true if at least a session is managed by this {@link IoProcessor}
220      */
221     protected abstract boolean isSelectorEmpty();
222 
223     /**
224      * Interrupt the {@link AbstractPollingIoProcessor#select(int) call.
225      */
226     protected abstract void wakeup();
227 
228     /**
229      * Get an {@link Iterator} for the list of {@link IoSession} polled by this
230      * {@link IoProcessor}   
231      * @return {@link Iterator} of {@link IoSession}
232      */
233     protected abstract Iterator<T> allSessions();
234 
235     /**
236      * Get an {@link Iterator} for the list of {@link IoSession} found selected 
237      * by the last call of {@link AbstractPollingIoProcessor#select(int)
238      * @return {@link Iterator} of {@link IoSession} read for I/Os operation
239      */
240     protected abstract Iterator<T> selectedSessions();
241 
242     /**
243      * Get the state of a session (preparing, open, closed)
244      * @param session the {@link IoSession} to inspect
245      * @return the state of the session
246      */
247     protected abstract SessionState getState(T session);
248 
249     /**
250      * Is the session ready for writing
251      * @param session the session queried
252      * @return true is ready, false if not ready
253      */
254     protected abstract boolean isWritable(T session);
255 
256     /**
257      * Is the session ready for reading
258      * @param session the session queried
259      * @return true is ready, false if not ready
260      */
261     protected abstract boolean isReadable(T session);
262 
263     /**
264      * register a session for writing
265      * @param session the session registered
266      * @param isInterested true for registering, false for removing
267      */
268     protected abstract void setInterestedInWrite(T session, boolean isInterested)
269             throws Exception;
270 
271     /**
272      * register a session for reading
273      * @param session the session registered
274      * @param isInterested true for registering, false for removing
275      */
276     protected abstract void setInterestedInRead(T session, boolean isInterested)
277             throws Exception;
278 
279     /**
280      * is this session registered for reading
281      * @param session the session queried
282      * @return true is registered for reading
283      */
284     protected abstract boolean isInterestedInRead(T session);
285 
286     /**
287      * is this session registered for writing
288      * @param session the session queried
289      * @return true is registered for writing
290      */
291     protected abstract boolean isInterestedInWrite(T session);
292 
293     /**
294      * Initialize the polling of a session. Add it to the polling process. 
295      * @param session the {@link IoSession} to add to the polling
296      * @throws Exception any exception thrown by the underlying system calls
297      */
298     protected abstract void init(T session) throws Exception;
299 
300     /**
301      * Destroy the underlying client socket handle
302      * @param session the {@link IoSession}
303      * @throws Exception any exception thrown by the underlying system calls
304      */
305     protected abstract void destroy(T session) throws Exception;
306 
307     /**
308      * Reads a sequence of bytes from a {@link IoSession} into the given {@link IoBuffer}. 
309      * Is called when the session was found ready for reading.
310      * @param session the session to read
311      * @param buf the buffer to fill
312      * @return the number of bytes read
313      * @throws Exception any exception thrown by the underlying system calls
314      */
315     protected abstract int read(T session, IoBuffer buf) throws Exception;
316 
317     /**
318      * Write a sequence of bytes to a {@link IoSession}, means to be called when a session
319      * was found ready for writing.
320      * @param session the session to write
321      * @param buf the buffer to write
322      * @param length the number of bytes to write can be superior to the number of bytes remaining
323      * in the buffer
324      * @return the number of byte written
325      * @throws Exception any exception thrown by the underlying system calls
326      */
327     protected abstract int write(T session, IoBuffer buf, int length)
328             throws Exception;
329 
330     /**
331      * Write a part of a file to a {@link IoSession}, if the underlying API isn't supporting
332      * system calls like sendfile(), you can throw a {@link UnsupportedOperationException} so 
333      * the file will be send using usual {@link #write(AbstractIoSession, IoBuffer, int)} call. 
334      * @param session the session to write
335      * @param region the file region to write
336      * @param length the length of the portion to send
337      * @return the number of written bytes
338      * @throws Exception any exception thrown by the underlying system calls
339      */
340     protected abstract int transferFile(T session, FileRegion region, int length)
341             throws Exception;
342 
343     /**
344      * {@inheritDoc}
345      */
346     public final void add(T session) {
347         if (isDisposing()) {
348             throw new IllegalStateException("Already disposed.");
349         }
350 
351         // Adds the session to the newSession queue and starts the worker
352         newSessions.add(session);
353         startupProcessor();
354     }
355 
356     /**
357      * {@inheritDoc}
358      */
359     public final void remove(T session) {
360         scheduleRemove(session);
361         startupProcessor();
362     }
363 
364     private void scheduleRemove(T session) {
365         removingSessions.add(session);
366     }
367 
368     /**
369      * {@inheritDoc}
370      */
371     public final void flush(T session) {
372         boolean needsWakeup = flushingSessions.isEmpty();
373         if (scheduleFlush(session) && needsWakeup) {
374             wakeup();
375         }
376     }
377 
378     private boolean scheduleFlush(T session) {
379         if (session.setScheduledForFlush(true)) {
380             // add the session to the queue
381             flushingSessions.add(session);
382             return true;
383         }
384         return false;
385     }
386 
387     /**
388      * {@inheritDoc}
389      */
390     public final void updateTrafficMask(T session) {
391         scheduleTrafficControl(session);
392         wakeup();
393     }
394 
395     private void scheduleTrafficControl(T session) {
396         trafficControllingSessions.add(session);
397     }
398 
399     /**
400      * Starts the inner Processor, asking the executor to pick a thread in its
401      * pool. The Runnable will be renamed 
402      */
403     private void startupProcessor() {
404         synchronized (lock) {
405             if (processor == null) {
406                 processor = new Processor();
407                 executor.execute(new NamePreservingRunnable(processor,
408                         threadName));
409             }
410         }
411 
412         // Just stop the select() and start it again, so that the processor
413         // can be activated immediately. 
414         wakeup();
415     }
416 
417     /**
418      * Loops over the new sessions blocking queue and returns
419      * the number of sessions which are effectively created
420      *
421      * @return The number of new sessions
422      */
423     private int handleNewSessions() {
424         int addedSessions = 0;
425 
426         for (;;) {
427             T session = newSessions.poll();
428 
429             if (session == null) {
430                 // All new sessions have been handled
431                 break;
432             }
433 
434             if (addNow(session)) {
435                 // A new session has been created 
436                 addedSessions++;
437             }
438         }
439 
440         return addedSessions;
441     }
442 
443     private boolean addNow(T session) {
444         boolean registered = false;
445         boolean notified = false;
446         
447         try {
448             init(session);
449             registered = true;
450 
451             // Build the filter chain of this session.
452             session.getService().getFilterChainBuilder().buildFilterChain(
453                     session.getFilterChain());
454 
455             // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
456             // in AbstractIoFilterChain.fireSessionOpened().
457             ((AbstractIoService) session.getService()).getListeners()
458                     .fireSessionCreated(session);
459             notified = true;
460         } catch (Throwable e) {
461             if (notified) {
462                 // Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute
463                 // and call ConnectFuture.setException().
464                 scheduleRemove(session);
465                 IoFilterChain filterChain = session.getFilterChain();
466                 filterChain.fireExceptionCaught(e);
467                 wakeup();
468             } else {
469                 ExceptionMonitor.getInstance().exceptionCaught(e);
470                 try {
471                     destroy(session);
472                 } catch (Exception e1) {
473                     ExceptionMonitor.getInstance().exceptionCaught(e1);
474                 } finally {
475                     registered = false;
476                 }
477             }
478         }
479         return registered;
480     }
481 
482     private int removeSessions() {
483         int removedSessions = 0;
484         
485         for (;;) {
486             T session = removingSessions.poll();
487 
488             if (session == null) {
489                 // No session to remove. Get out.
490                 return removedSessions;
491             }
492 
493             SessionState state = getState(session);
494             
495             switch (state) {
496                 case OPENED:
497                     if (removeNow(session)) {
498                         removedSessions++;
499                     }
500                     
501                     break;
502                     
503                 case CLOSING:
504                     // Skip if channel is already closed
505                     break;
506                     
507                 case OPENING:
508                     // Remove session from the newSessions queue and
509                     // remove it
510                     newSessions.remove(session);
511                     
512                     if (removeNow(session)) {
513                         removedSessions++;
514                     }
515                     
516                     break;
517                     
518                 default:
519                     throw new IllegalStateException(String.valueOf(state));
520             }
521         }
522     }
523 
524     private boolean removeNow(T session) {
525         clearWriteRequestQueue(session);
526 
527         try {
528             destroy(session);
529             return true;
530         } catch (Exception e) {
531             IoFilterChain filterChain = session.getFilterChain();
532             filterChain.fireExceptionCaught(e);
533         } finally {
534             clearWriteRequestQueue(session);
535             ((AbstractIoService) session.getService()).getListeners()
536                     .fireSessionDestroyed(session);
537         }
538         return false;
539     }
540 
541     private void clearWriteRequestQueue(T session) {
542         WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
543         WriteRequest req;
544 
545         List<WriteRequest> failedRequests = new ArrayList<WriteRequest>();
546 
547         if ((req = writeRequestQueue.poll(session)) != null) {
548             Object m = req.getMessage();
549             if (m instanceof IoBuffer) {
550                 IoBuffer buf = (IoBuffer) req.getMessage();
551 
552                 // The first unwritten empty buffer must be
553                 // forwarded to the filter chain.
554                 if (buf.hasRemaining()) {
555                     buf.reset();
556                     failedRequests.add(req);
557                 } else {
558                     IoFilterChain filterChain = session.getFilterChain();
559                     filterChain.fireMessageSent(req);
560                 }
561             } else {
562                 failedRequests.add(req);
563             }
564 
565             // Discard others.
566             while ((req = writeRequestQueue.poll(session)) != null) {
567                 failedRequests.add(req);
568             }
569         }
570 
571         // Create an exception and notify.
572         if (!failedRequests.isEmpty()) {
573             WriteToClosedSessionException cause = new WriteToClosedSessionException(
574                     failedRequests);
575             for (WriteRequest r : failedRequests) {
576                 session.decreaseScheduledBytesAndMessages(r);
577                 r.getFuture().setException(cause);
578             }
579             IoFilterChain filterChain = session.getFilterChain();
580             filterChain.fireExceptionCaught(cause);
581         }
582     }
583 
584     private void process() throws Exception {
585         for (Iterator<T> i = selectedSessions(); i.hasNext();) {
586             T session = i.next();
587             process(session);
588             i.remove();
589         }
590     }
591 
592     /**
593      * Deal with session ready for the read or write operations, or both.
594      */
595     private void process(T session) {
596         // Process Reads
597         if (isReadable(session) && !session.isReadSuspended()) {
598             read(session);
599         }
600 
601         // Process writes
602         if (isWritable(session) && !session.isWriteSuspended()) {
603             scheduleFlush(session);
604         }
605     }
606 
607     private void read(T session) {
608         IoSessionConfig config = session.getConfig();
609         IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
610 
611         final boolean hasFragmentation = session.getTransportMetadata()
612                 .hasFragmentation();
613 
614         try {
615             int readBytes = 0;
616             int ret;
617 
618             try {
619                 if (hasFragmentation) {
620                     while ((ret = read(session, buf)) > 0) {
621                         readBytes += ret;
622                         if (!buf.hasRemaining()) {
623                             break;
624                         }
625                     }
626                 } else {
627                     ret = read(session, buf);
628                     if (ret > 0) {
629                         readBytes = ret;
630                     }
631                 }
632             } finally {
633                 buf.flip();
634             }
635 
636             if (readBytes > 0) {
637                 IoFilterChain filterChain = session.getFilterChain();
638                 filterChain.fireMessageReceived(buf);
639                 buf = null;
640 
641                 if (hasFragmentation) {
642                     if (readBytes << 1 < config.getReadBufferSize()) {
643                         session.decreaseReadBufferSize();
644                     } else if (readBytes == config.getReadBufferSize()) {
645                         session.increaseReadBufferSize();
646                     }
647                 }
648             }
649             
650             if (ret < 0) {
651                 scheduleRemove(session);
652             }
653         } catch (Throwable e) {
654             if (e instanceof IOException) {
655                 if (!(e instanceof PortUnreachableException)
656                         || !AbstractDatagramSessionConfig.class
657                                 .isAssignableFrom(config.getClass())
658                         || ((AbstractDatagramSessionConfig) config)
659                                 .isCloseOnPortUnreachable())
660 
661                     scheduleRemove(session);
662             }
663             
664             IoFilterChain filterChain = session.getFilterChain();
665             filterChain.fireExceptionCaught(e);
666         }
667     }
668 
669     private void notifyIdleSessions(long currentTime) throws Exception {
670         // process idle sessions
671         if (currentTime - lastIdleCheckTime >= SELECT_TIMEOUT) {
672             lastIdleCheckTime = currentTime;
673             AbstractIoSession.notifyIdleness(allSessions(), currentTime);
674         }
675     }
676 
677     private void flush(long currentTime) {
678         final T firstSession = flushingSessions.peek();
679         if (firstSession == null) {
680             return;
681         }
682 
683         T session = flushingSessions.poll(); // the same one with firstSession
684         
685         for (;;) {
686             session.setScheduledForFlush(false);
687             SessionState state = getState(session);
688 
689             switch (state) {
690                 case OPENED:
691                     try {
692                         boolean flushedAll = flushNow(session, currentTime);
693                         if (flushedAll
694                                 && !session.getWriteRequestQueue().isEmpty(session)
695                                 && !session.isScheduledForFlush()) {
696                             scheduleFlush(session);
697                         }
698                     } catch (Exception e) {
699                         scheduleRemove(session);
700                         IoFilterChain filterChain = session.getFilterChain();
701                         filterChain.fireExceptionCaught(e);
702                     }
703                     
704                     break;
705                     
706                 case CLOSING:
707                     // Skip if the channel is already closed.
708                     break;
709                     
710                 case OPENING:
711                     // Retry later if session is not yet fully initialized.
712                     // (In case that Session.write() is called before addSession() is processed)
713                     scheduleFlush(session);
714                     return;
715                     
716                 default:
717                     throw new IllegalStateException(String.valueOf(state));
718             }
719 
720             session = flushingSessions.peek();
721             if (session == null || session == firstSession) {
722                 break;
723             }
724             session = flushingSessions.poll();
725         }
726     }
727 
728     private boolean flushNow(T session, long currentTime) {
729         if (!session.isConnected()) {
730             scheduleRemove(session);
731             return false;
732         }
733 
734         final boolean hasFragmentation = session.getTransportMetadata()
735                 .hasFragmentation();
736 
737         final WriteRequestQueue writeRequestQueue = session
738                 .getWriteRequestQueue();
739 
740         // Set limitation for the number of written bytes for read-write
741         // fairness.  I used maxReadBufferSize * 3 / 2, which yields best
742         // performance in my experience while not breaking fairness much.
743         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
744                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
745         int writtenBytes = 0;
746         WriteRequest req = null;
747         try {
748             // Clear OP_WRITE
749             setInterestedInWrite(session, false);
750             do {
751                 // Check for pending writes.
752                 req = session.getCurrentWriteRequest();
753                 if (req == null) {
754                     req = writeRequestQueue.poll(session);
755                     if (req == null) {
756                         break;
757                     }
758                     session.setCurrentWriteRequest(req);
759                 }
760 
761                 int localWrittenBytes = 0;
762                 Object message = req.getMessage();
763                 if (message instanceof IoBuffer) {
764                     localWrittenBytes = writeBuffer(session, req,
765                             hasFragmentation, maxWrittenBytes - writtenBytes,
766                             currentTime);
767                     if (localWrittenBytes > 0
768                             && ((IoBuffer) message).hasRemaining()) {
769                         // the buffer isn't empty, we re-interest it in writing 
770                         writtenBytes += localWrittenBytes;
771                         setInterestedInWrite(session, true);
772                         return false;
773                     }
774                 } else if (message instanceof FileRegion) {
775                     localWrittenBytes = writeFile(session, req,
776                             hasFragmentation, maxWrittenBytes - writtenBytes,
777                             currentTime);
778 
779                     // Fix for Java bug on Linux http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=5103988
780                     // If there's still data to be written in the FileRegion, return 0 indicating that we need
781                     // to pause until writing may resume.
782                     if (localWrittenBytes > 0
783                             && ((FileRegion) message).getRemainingBytes() > 0) {
784                         writtenBytes += localWrittenBytes;
785                         setInterestedInWrite(session, true);
786                         return false;
787                     }
788                 } else {
789                     throw new IllegalStateException(
790                             "Don't know how to handle message of type '"
791                                     + message.getClass().getName()
792                                     + "'.  Are you missing a protocol encoder?");
793                 }
794 
795                 if (localWrittenBytes == 0) {
796                     // Kernel buffer is full.
797                     setInterestedInWrite(session, true);
798                     return false;
799                 }
800 
801                 writtenBytes += localWrittenBytes;
802 
803                 if (writtenBytes >= maxWrittenBytes) {
804                     // Wrote too much
805                     scheduleFlush(session);
806                     return false;
807                 }
808             } while (writtenBytes < maxWrittenBytes);
809         } catch (Exception e) {
810         	if (req != null) {
811         		req.getFuture().setException(e);
812         	}
813             IoFilterChain filterChain = session.getFilterChain();
814             filterChain.fireExceptionCaught(e);
815             return false;
816         }
817 
818         return true;
819     }
820 
821     private int writeBuffer(T session, WriteRequest req,
822             boolean hasFragmentation, int maxLength, long currentTime)
823             throws Exception {
824         IoBuffer buf = (IoBuffer) req.getMessage();
825         int localWrittenBytes = 0;
826         if (buf.hasRemaining()) {
827             int length;
828             if (hasFragmentation) {
829                 length = Math.min(buf.remaining(), maxLength);
830             } else {
831                 length = buf.remaining();
832             }
833             for (int i = WRITE_SPIN_COUNT; i > 0; i--) {
834                 localWrittenBytes = write(session, buf, length);
835                 if (localWrittenBytes != 0) {
836                     break;
837                 }
838             }
839         }
840 
841         session.increaseWrittenBytes(localWrittenBytes, currentTime);
842 
843         if (!buf.hasRemaining() || !hasFragmentation && localWrittenBytes != 0) {
844             // Buffer has been sent, clear the current request.
845             buf.reset();
846             fireMessageSent(session, req);
847         }
848         return localWrittenBytes;
849     }
850 
851     private int writeFile(T session, WriteRequest req,
852             boolean hasFragmentation, int maxLength, long currentTime)
853             throws Exception {
854         int localWrittenBytes;
855         FileRegion region = (FileRegion) req.getMessage();
856         if (region.getRemainingBytes() > 0) {
857             int length;
858             if (hasFragmentation) {
859                 length = (int) Math.min(region.getRemainingBytes(), maxLength);
860             } else {
861                 length = (int) Math.min(Integer.MAX_VALUE, region
862                         .getRemainingBytes());
863             }
864             localWrittenBytes = transferFile(session, region, length);
865             region.update(localWrittenBytes);
866         } else {
867             localWrittenBytes = 0;
868         }
869 
870         session.increaseWrittenBytes(localWrittenBytes, currentTime);
871 
872         if (region.getRemainingBytes() <= 0 || !hasFragmentation
873                 && localWrittenBytes != 0) {
874             fireMessageSent(session, req);
875         }
876 
877         return localWrittenBytes;
878     }
879 
880     private void fireMessageSent(T session, WriteRequest req) {
881         session.setCurrentWriteRequest(null);
882         IoFilterChain filterChain = session.getFilterChain();
883         filterChain.fireMessageSent(req);
884     }
885 
886     /**
887      * Update the trafficControl for all the session which has
888      * just been opened. 
889      */
890     private void updateTrafficMask() {
891         int queueSize = trafficControllingSessions.size();
892         
893         while (queueSize > 0) {
894             T session = trafficControllingSessions.poll();
895 
896             if (session == null) {
897                 return;
898             }
899 
900             SessionState state = getState(session);
901             
902             switch (state) {
903                 case OPENED:
904                     updateTrafficControl(session);
905                     break;
906                     
907                 case CLOSING:
908                     break;
909                     
910                 case OPENING:
911                     // Retry later if session is not yet fully initialized.
912                     // (In case that Session.suspend??() or session.resume??() is
913                     // called before addSession() is processed)
914                     // We just put back the session at the end of the queue.
915                     trafficControllingSessions.add(session);
916                     
917                     break;
918                     
919                 default:
920                     throw new IllegalStateException(String.valueOf(state));
921             }
922             
923             // As we have handled one session, decrement the number of 
924             // remaining sessions.
925             queueSize--;
926         }
927     }
928 
929     /**
930      * {@inheritDoc}
931      */
932     public void updateTrafficControl(T session) {
933         try {
934             setInterestedInRead(session, !session.isReadSuspended());
935         } catch (Exception e) {
936             IoFilterChain filterChain = session.getFilterChain();
937             filterChain.fireExceptionCaught(e);
938         }
939         
940         try {
941             setInterestedInWrite(session, 
942                 !session.getWriteRequestQueue().isEmpty(session) && 
943                 !session.isWriteSuspended());
944         } catch (Exception e) {
945             IoFilterChain filterChain = session.getFilterChain();
946             filterChain.fireExceptionCaught(e);
947         }
948     }
949 
950     private class Processor implements Runnable {
951         public void run() {
952             int nSessions = 0;
953             lastIdleCheckTime = System.currentTimeMillis();
954 
955             for (;;) {
956                 try {
957                     // This select has a timeout so that we can manage
958                     // idle session when we get out of the select every
959                     // second. (note : this is a hack to avoid creating
960                     // a dedicated thread).
961                     int selected = select(SELECT_TIMEOUT);
962 
963                     nSessions += handleNewSessions();
964                     updateTrafficMask();
965 
966                     // Now, if we have had some incoming or outgoing events,
967                     // deal with them
968                     if (selected > 0) {
969                         process();
970                     }
971 
972                     long currentTime = System.currentTimeMillis();
973                     flush(currentTime);
974                     nSessions -= removeSessions();
975                     notifyIdleSessions(currentTime);
976 
977                     if (nSessions == 0) {
978                         synchronized (lock) {
979                             if (newSessions.isEmpty() && isSelectorEmpty()) {
980                                 processor = null;
981                                 break;
982                             }
983                         }
984                     }
985 
986                     // Disconnect all sessions immediately if disposal has been
987                     // requested so that we exit this loop eventually.
988                     if (isDisposing()) {
989                         for (Iterator<T> i = allSessions(); i.hasNext();) {
990                             scheduleRemove(i.next());
991                         }
992                         wakeup();
993                     }
994                 } catch (Throwable t) {
995                     ExceptionMonitor.getInstance().exceptionCaught(t);
996 
997                     try {
998                         Thread.sleep(1000);
999                     } catch (InterruptedException e1) {
1000                         ExceptionMonitor.getInstance().exceptionCaught(e1);
1001                     }
1002                 }
1003             }
1004 
1005             try {
1006                 synchronized (disposalLock) {
1007                     if (isDisposing()) {
1008                         dispose0();
1009                     }
1010                 }
1011             } catch (Throwable t) {
1012                 ExceptionMonitor.getInstance().exceptionCaught(t);
1013             } finally {
1014                 disposalFuture.setValue(true);
1015             }
1016         }
1017     }
1018 }