View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.session;
21  
22  import java.io.File;
23  import java.io.FileInputStream;
24  import java.io.IOException;
25  import java.net.SocketAddress;
26  import java.nio.channels.FileChannel;
27  import java.util.Iterator;
28  import java.util.Queue;
29  import java.util.Set;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.mina.core.buffer.IoBuffer;
36  import org.apache.mina.core.file.DefaultFileRegion;
37  import org.apache.mina.core.file.FilenameFileRegion;
38  import org.apache.mina.core.filterchain.IoFilterChain;
39  import org.apache.mina.core.future.CloseFuture;
40  import org.apache.mina.core.future.DefaultCloseFuture;
41  import org.apache.mina.core.future.DefaultReadFuture;
42  import org.apache.mina.core.future.DefaultWriteFuture;
43  import org.apache.mina.core.future.IoFutureListener;
44  import org.apache.mina.core.future.ReadFuture;
45  import org.apache.mina.core.future.WriteFuture;
46  import org.apache.mina.core.service.AbstractIoService;
47  import org.apache.mina.core.service.IoAcceptor;
48  import org.apache.mina.core.service.IoHandler;
49  import org.apache.mina.core.service.IoProcessor;
50  import org.apache.mina.core.service.IoService;
51  import org.apache.mina.core.service.TransportMetadata;
52  import org.apache.mina.core.write.DefaultWriteRequest;
53  import org.apache.mina.core.write.WriteException;
54  import org.apache.mina.core.write.WriteRequest;
55  import org.apache.mina.core.write.WriteRequestQueue;
56  import org.apache.mina.core.write.WriteTimeoutException;
57  import org.apache.mina.core.write.WriteToClosedSessionException;
58  import org.apache.mina.util.ExceptionMonitor;
59  
60  /**
61   * Base implementation of {@link IoSession}.
62   * 
63   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
64   */
65  public abstract class AbstractIoSession implements IoSession {
66      /** The associated handler */
67      private final IoHandler handler;
68  
69      /** The session config */
70      protected IoSessionConfig config;
71  
72      /** The service which will manage this session */
73      private final IoService service;
74  
75      private static final AttributeKey READY_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
76              "readyReadFutures");
77  
78      private static final AttributeKey WAITING_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
79              "waitingReadFutures");
80  
81      private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() {
82          public void operationComplete(CloseFuture future) {
83              AbstractIoSession session = (AbstractIoSession) future.getSession();
84              session.scheduledWriteBytes.set(0);
85              session.scheduledWriteMessages.set(0);
86              session.readBytesThroughput = 0;
87              session.readMessagesThroughput = 0;
88              session.writtenBytesThroughput = 0;
89              session.writtenMessagesThroughput = 0;
90          }
91      };
92  
93      /**
94       * An internal write request object that triggers session close.
95       * 
96       * @see #writeRequestQueue
97       */
98      public static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object());
99  
100     /**
101      * An internal write request object that triggers message sent events.
102      * 
103      * @see #writeRequestQueue
104      */
105     public static final WriteRequest MESSAGE_SENT_REQUEST = new DefaultWriteRequest(DefaultWriteRequest.EMPTY_MESSAGE);
106 
107     private final Object lock = new Object();
108 
109     private IoSessionAttributeMap attributes;
110 
111     private WriteRequestQueue writeRequestQueue;
112 
113     private WriteRequest currentWriteRequest;
114 
115     /** The Session creation's time */
116     private final long creationTime;
117 
118     /** An id generator guaranteed to generate unique IDs for the session */
119     private static AtomicLong idGenerator = new AtomicLong(0);
120 
121     /** The session ID */
122     private long sessionId;
123 
124     /**
125      * A future that will be set 'closed' when the connection is closed.
126      */
127     private final CloseFuture closeFuture = new DefaultCloseFuture(this);
128 
129     private volatile boolean closing;
130 
131     // traffic control
132     private boolean readSuspended = false;
133 
134     private boolean writeSuspended = false;
135 
136     // Status variables
137     private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
138 
139     private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
140 
141     private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
142 
143     private long readBytes;
144 
145     private long writtenBytes;
146 
147     private long readMessages;
148 
149     private long writtenMessages;
150 
151     private long lastReadTime;
152 
153     private long lastWriteTime;
154 
155     private long lastThroughputCalculationTime;
156 
157     private long lastReadBytes;
158 
159     private long lastWrittenBytes;
160 
161     private long lastReadMessages;
162 
163     private long lastWrittenMessages;
164 
165     private double readBytesThroughput;
166 
167     private double writtenBytesThroughput;
168 
169     private double readMessagesThroughput;
170 
171     private double writtenMessagesThroughput;
172 
173     private AtomicInteger idleCountForBoth = new AtomicInteger();
174 
175     private AtomicInteger idleCountForRead = new AtomicInteger();
176 
177     private AtomicInteger idleCountForWrite = new AtomicInteger();
178 
179     private long lastIdleTimeForBoth;
180 
181     private long lastIdleTimeForRead;
182 
183     private long lastIdleTimeForWrite;
184 
185     private boolean deferDecreaseReadBuffer = true;
186 
187     /**
188      * Create a Session for a service
189      * 
190      * @param service the Service for this session
191      */
192     protected AbstractIoSession(IoService service) {
193         this.service = service;
194         this.handler = service.getHandler();
195 
196         // Initialize all the Session counters to the current time
197         long currentTime = System.currentTimeMillis();
198         creationTime = currentTime;
199         lastThroughputCalculationTime = currentTime;
200         lastReadTime = currentTime;
201         lastWriteTime = currentTime;
202         lastIdleTimeForBoth = currentTime;
203         lastIdleTimeForRead = currentTime;
204         lastIdleTimeForWrite = currentTime;
205 
206         // TODO add documentation
207         closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
208 
209         // Set a new ID for this session
210         sessionId = idGenerator.incrementAndGet();
211     }
212 
213     /**
214      * {@inheritDoc}
215      * 
216      * We use an AtomicLong to guarantee that the session ID are unique.
217      */
218     public final long getId() {
219         return sessionId;
220     }
221 
222     /**
223      * @return The associated IoProcessor for this session
224      */
225     public abstract IoProcessor getProcessor();
226 
227     /**
228      * {@inheritDoc}
229      */
230     public final boolean isConnected() {
231         return !closeFuture.isClosed();
232     }
233 
234     /**
235      * {@inheritDoc}
236      */
237     public boolean isActive() {
238         // Return true by default
239         return true;
240     }
241 
242     /**
243      * {@inheritDoc}
244      */
245     public final boolean isClosing() {
246         return closing || closeFuture.isClosed();
247     }
248 
249     /**
250      * {@inheritDoc}
251      */
252     public boolean isSecured() {
253         // Always false...
254         return false;
255     }
256 
257     /**
258      * {@inheritDoc}
259      */
260     public final CloseFuture getCloseFuture() {
261         return closeFuture;
262     }
263 
264     /**
265      * Tells if the session is scheduled for flushed
266      * 
267      * @return true if the session is scheduled for flush
268      */
269     public final boolean isScheduledForFlush() {
270         return scheduledForFlush.get();
271     }
272 
273     /**
274      * Schedule the session for flushed
275      */
276     public final void scheduledForFlush() {
277         scheduledForFlush.set(true);
278     }
279 
280     /**
281      * Change the session's status : it's not anymore scheduled for flush
282      */
283     public final void unscheduledForFlush() {
284         scheduledForFlush.set(false);
285     }
286 
287     /**
288      * Set the scheduledForFLush flag. As we may have concurrent access to this
289      * flag, we compare and set it in one call.
290      * 
291      * @param schedule
292      *            the new value to set if not already set.
293      * @return true if the session flag has been set, and if it wasn't set
294      *         already.
295      */
296     public final boolean setScheduledForFlush(boolean schedule) {
297         if (schedule) {
298             // If the current tag is set to false, switch it to true,
299             // otherwise, we do nothing but return false : the session
300             // is already scheduled for flush
301             return scheduledForFlush.compareAndSet(false, schedule);
302         }
303 
304         scheduledForFlush.set(schedule);
305         return true;
306     }
307 
308     /**
309      * {@inheritDoc}
310      */
311     public final CloseFuture close(boolean rightNow) {
312         if (rightNow) {
313             return closeNow();
314         } else {
315             return closeOnFlush();
316         }
317     }
318 
319     /**
320      * {@inheritDoc}
321      */
322     public final CloseFuture close() {
323         return closeNow();
324     }
325 
326     /**
327      * {@inheritDoc}
328      */
329     public final CloseFuture closeOnFlush() {
330         if (!isClosing()) {
331             getWriteRequestQueue().offer(this, CLOSE_REQUEST);
332             getProcessor().flush(this);
333         }
334         
335         return closeFuture;
336     }
337 
338     /**
339      * {@inheritDoc}
340      */
341     public final CloseFuture closeNow() {
342         synchronized (lock) {
343             if (isClosing()) {
344                 return closeFuture;
345             }
346 
347             closing = true;
348             
349             try {
350                 destroy();
351             } catch (Exception e) {
352                 IoFilterChain filterChain = getFilterChain();
353                 filterChain.fireExceptionCaught(e);
354             }
355         }
356 
357         getFilterChain().fireFilterClose();
358 
359         return closeFuture;
360     }
361     
362     /**
363      * Destroy the session
364      */
365     protected void destroy() {
366         if (writeRequestQueue != null) {
367             while (!writeRequestQueue.isEmpty(this)) {
368                 WriteRequest writeRequest = writeRequestQueue.poll(this);
369                 writeRequest.getFuture().setWritten();
370             }
371         }
372     }
373 
374     /**
375      * {@inheritDoc}
376      */
377     public IoHandler getHandler() {
378         return handler;
379     }
380 
381     /**
382      * {@inheritDoc}
383      */
384     public IoSessionConfig getConfig() {
385         return config;
386     }
387 
388     /**
389      * {@inheritDoc}
390      */
391     public final ReadFuture read() {
392         if (!getConfig().isUseReadOperation()) {
393             throw new IllegalStateException("useReadOperation is not enabled.");
394         }
395 
396         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
397         ReadFuture future;
398         synchronized (readyReadFutures) {
399             future = readyReadFutures.poll();
400             if (future != null) {
401                 if (future.isClosed()) {
402                     // Let other readers get notified.
403                     readyReadFutures.offer(future);
404                 }
405             } else {
406                 future = new DefaultReadFuture(this);
407                 getWaitingReadFutures().offer(future);
408             }
409         }
410 
411         return future;
412     }
413 
414     /**
415      * Associates a message to a ReadFuture
416      * 
417      * @param message the message to associate to the ReadFuture
418      * 
419      */
420     public final void offerReadFuture(Object message) {
421         newReadFuture().setRead(message);
422     }
423 
424     /**
425      * Associates a failure to a ReadFuture
426      * 
427      * @param exception the exception to associate to the ReadFuture
428      */
429     public final void offerFailedReadFuture(Throwable exception) {
430         newReadFuture().setException(exception);
431     }
432 
433     /**
434      * Inform the ReadFuture that the session has been closed
435      */
436     public final void offerClosedReadFuture() {
437         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
438         
439         synchronized (readyReadFutures) {
440             newReadFuture().setClosed();
441         }
442     }
443 
444     /**
445      * @return a readFuture get from the waiting ReadFuture
446      */
447     private ReadFuture newReadFuture() {
448         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
449         Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
450         ReadFuture future;
451         
452         synchronized (readyReadFutures) {
453             future = waitingReadFutures.poll();
454             
455             if (future == null) {
456                 future = new DefaultReadFuture(this);
457                 readyReadFutures.offer(future);
458             }
459         }
460         
461         return future;
462     }
463 
464     /**
465      * @return a queue of ReadFuture
466      */
467     private Queue<ReadFuture> getReadyReadFutures() {
468         Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
469         
470         if (readyReadFutures == null) {
471             readyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
472 
473             Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY,
474                     readyReadFutures);
475             
476             if (oldReadyReadFutures != null) {
477                 readyReadFutures = oldReadyReadFutures;
478             }
479         }
480         
481         return readyReadFutures;
482     }
483 
484     /**
485      * @return the queue of waiting ReadFuture
486      */
487     private Queue<ReadFuture> getWaitingReadFutures() {
488         Queue<ReadFuture> waitingReadyReadFutures = (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
489         
490         if (waitingReadyReadFutures == null) {
491             waitingReadyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
492 
493             Queue<ReadFuture> oldWaitingReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(
494                     WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
495             
496             if (oldWaitingReadyReadFutures != null) {
497                 waitingReadyReadFutures = oldWaitingReadyReadFutures;
498             }
499         }
500         
501         return waitingReadyReadFutures;
502     }
503 
504     /**
505      * {@inheritDoc}
506      */
507     public WriteFuture write(Object message) {
508         return write(message, null);
509     }
510 
511     /**
512      * {@inheritDoc}
513      */
514     public WriteFuture write(Object message, SocketAddress remoteAddress) {
515         if (message == null) {
516             throw new IllegalArgumentException("Trying to write a null message : not allowed");
517         }
518 
519         // We can't send a message to a connected session if we don't have
520         // the remote address
521         if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) {
522             throw new UnsupportedOperationException();
523         }
524 
525         // If the session has been closed or is closing, we can't either
526         // send a message to the remote side. We generate a future
527         // containing an exception.
528         if (isClosing() || !isConnected()) {
529             WriteFuture future = new DefaultWriteFuture(this);
530             WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
531             WriteException writeException = new WriteToClosedSessionException(request);
532             future.setException(writeException);
533             return future;
534         }
535 
536         FileChannel openedFileChannel = null;
537 
538         // TODO: remove this code as soon as we use InputStream
539         // instead of Object for the message.
540         try {
541             if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) {
542                 // Nothing to write : probably an error in the user code
543                 throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
544             } else if (message instanceof FileChannel) {
545                 FileChannel fileChannel = (FileChannel) message;
546                 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
547             } else if (message instanceof File) {
548                 File file = (File) message;
549                 openedFileChannel = new FileInputStream(file).getChannel();
550                 message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size());
551             }
552         } catch (IOException e) {
553             ExceptionMonitor.getInstance().exceptionCaught(e);
554             return DefaultWriteFuture.newNotWrittenFuture(this, e);
555         }
556 
557         // Now, we can write the message. First, create a future
558         WriteFuture writeFuture = new DefaultWriteFuture(this);
559         WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
560 
561         // Then, get the chain and inject the WriteRequest into it
562         IoFilterChain filterChain = getFilterChain();
563         filterChain.fireFilterWrite(writeRequest);
564 
565         // TODO : This is not our business ! The caller has created a
566         // FileChannel,
567         // he has to close it !
568         if (openedFileChannel != null) {
569             // If we opened a FileChannel, it needs to be closed when the write
570             // has completed
571             final FileChannel finalChannel = openedFileChannel;
572             writeFuture.addListener(new IoFutureListener<WriteFuture>() {
573                 public void operationComplete(WriteFuture future) {
574                     try {
575                         finalChannel.close();
576                     } catch (IOException e) {
577                         ExceptionMonitor.getInstance().exceptionCaught(e);
578                     }
579                 }
580             });
581         }
582 
583         // Return the WriteFuture.
584         return writeFuture;
585     }
586 
587     /**
588      * {@inheritDoc}
589      */
590     public final Object getAttachment() {
591         return getAttribute("");
592     }
593 
594     /**
595      * {@inheritDoc}
596      */
597     public final Object setAttachment(Object attachment) {
598         return setAttribute("", attachment);
599     }
600 
601     /**
602      * {@inheritDoc}
603      */
604     public final Object getAttribute(Object key) {
605         return getAttribute(key, null);
606     }
607 
608     /**
609      * {@inheritDoc}
610      */
611     public final Object getAttribute(Object key, Object defaultValue) {
612         return attributes.getAttribute(this, key, defaultValue);
613     }
614 
615     /**
616      * {@inheritDoc}
617      */
618     public final Object setAttribute(Object key, Object value) {
619         return attributes.setAttribute(this, key, value);
620     }
621 
622     /**
623      * {@inheritDoc}
624      */
625     public final Object setAttribute(Object key) {
626         return setAttribute(key, Boolean.TRUE);
627     }
628 
629     /**
630      * {@inheritDoc}
631      */
632     public final Object setAttributeIfAbsent(Object key, Object value) {
633         return attributes.setAttributeIfAbsent(this, key, value);
634     }
635 
636     /**
637      * {@inheritDoc}
638      */
639     public final Object setAttributeIfAbsent(Object key) {
640         return setAttributeIfAbsent(key, Boolean.TRUE);
641     }
642 
643     /**
644      * {@inheritDoc}
645      */
646     public final Object removeAttribute(Object key) {
647         return attributes.removeAttribute(this, key);
648     }
649 
650     /**
651      * {@inheritDoc}
652      */
653     public final boolean removeAttribute(Object key, Object value) {
654         return attributes.removeAttribute(this, key, value);
655     }
656 
657     /**
658      * {@inheritDoc}
659      */
660     public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
661         return attributes.replaceAttribute(this, key, oldValue, newValue);
662     }
663 
664     /**
665      * {@inheritDoc}
666      */
667     public final boolean containsAttribute(Object key) {
668         return attributes.containsAttribute(this, key);
669     }
670 
671     /**
672      * {@inheritDoc}
673      */
674     public final Set<Object> getAttributeKeys() {
675         return attributes.getAttributeKeys(this);
676     }
677 
678     /**
679      * @return The map of attributes associated with the session
680      */
681     public final IoSessionAttributeMap getAttributeMap() {
682         return attributes;
683     }
684 
685     /**
686      * Set the map of attributes associated with the session
687      * 
688      * @param attributes The Map of attributes
689      */
690     public final void setAttributeMap(IoSessionAttributeMap attributes) {
691         this.attributes = attributes;
692     }
693 
694     /**
695      * Create a new close aware write queue, based on the given write queue.
696      * 
697      * @param writeRequestQueue The write request queue
698      */
699     public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
700         this.writeRequestQueue = writeRequestQueue;
701     }
702 
703     /**
704      * {@inheritDoc}
705      */
706     public final void suspendRead() {
707         readSuspended = true;
708         if (isClosing() || !isConnected()) {
709             return;
710         }
711         getProcessor().updateTrafficControl(this);
712     }
713 
714     /**
715      * {@inheritDoc}
716      */
717     public final void suspendWrite() {
718         writeSuspended = true;
719         if (isClosing() || !isConnected()) {
720             return;
721         }
722         getProcessor().updateTrafficControl(this);
723     }
724 
725     /**
726      * {@inheritDoc}
727      */
728     @SuppressWarnings("unchecked")
729     public final void resumeRead() {
730         readSuspended = false;
731         if (isClosing() || !isConnected()) {
732             return;
733         }
734         getProcessor().updateTrafficControl(this);
735     }
736 
737     /**
738      * {@inheritDoc}
739      */
740     @SuppressWarnings("unchecked")
741     public final void resumeWrite() {
742         writeSuspended = false;
743         if (isClosing() || !isConnected()) {
744             return;
745         }
746         getProcessor().updateTrafficControl(this);
747     }
748 
749     /**
750      * {@inheritDoc}
751      */
752     public boolean isReadSuspended() {
753         return readSuspended;
754     }
755 
756     /**
757      * {@inheritDoc}
758      */
759     public boolean isWriteSuspended() {
760         return writeSuspended;
761     }
762 
763     /**
764      * {@inheritDoc}
765      */
766     public final long getReadBytes() {
767         return readBytes;
768     }
769 
770     /**
771      * {@inheritDoc}
772      */
773     public final long getWrittenBytes() {
774         return writtenBytes;
775     }
776 
777     /**
778      * {@inheritDoc}
779      */
780     public final long getReadMessages() {
781         return readMessages;
782     }
783 
784     /**
785      * {@inheritDoc}
786      */
787     public final long getWrittenMessages() {
788         return writtenMessages;
789     }
790 
791     /**
792      * {@inheritDoc}
793      */
794     public final double getReadBytesThroughput() {
795         return readBytesThroughput;
796     }
797 
798     /**
799      * {@inheritDoc}
800      */
801     public final double getWrittenBytesThroughput() {
802         return writtenBytesThroughput;
803     }
804 
805     /**
806      * {@inheritDoc}
807      */
808     public final double getReadMessagesThroughput() {
809         return readMessagesThroughput;
810     }
811 
812     /**
813      * {@inheritDoc}
814      */
815     public final double getWrittenMessagesThroughput() {
816         return writtenMessagesThroughput;
817     }
818 
819     /**
820      * {@inheritDoc}
821      */
822     public final void updateThroughput(long currentTime, boolean force) {
823         int interval = (int) (currentTime - lastThroughputCalculationTime);
824 
825         long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
826 
827         if (((minInterval == 0) || (interval < minInterval)) && !force) {
828             return;
829         }
830 
831         readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
832         writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
833         readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
834         writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
835 
836         lastReadBytes = readBytes;
837         lastWrittenBytes = writtenBytes;
838         lastReadMessages = readMessages;
839         lastWrittenMessages = writtenMessages;
840 
841         lastThroughputCalculationTime = currentTime;
842     }
843 
844     /**
845      * {@inheritDoc}
846      */
847     public final long getScheduledWriteBytes() {
848         return scheduledWriteBytes.get();
849     }
850 
851     /**
852      * {@inheritDoc}
853      */
854     public final int getScheduledWriteMessages() {
855         return scheduledWriteMessages.get();
856     }
857 
858     /**
859      * Set the number of scheduled write bytes
860      * 
861      * @param byteCount The number of scheduled bytes for write
862      */
863     protected void setScheduledWriteBytes(int byteCount) {
864         scheduledWriteBytes.set(byteCount);
865     }
866 
867     /**
868      * Set the number of scheduled write messages
869      * 
870      * @param messages The number of scheduled messages for write
871      */
872     protected void setScheduledWriteMessages(int messages) {
873         scheduledWriteMessages.set(messages);
874     }
875 
876     /**
877      * Increase the number of read bytes
878      * 
879      * @param increment The number of read bytes
880      * @param currentTime The current time
881      */
882     public final void increaseReadBytes(long increment, long currentTime) {
883         if (increment <= 0) {
884             return;
885         }
886 
887         readBytes += increment;
888         lastReadTime = currentTime;
889         idleCountForBoth.set(0);
890         idleCountForRead.set(0);
891 
892         if (getService() instanceof AbstractIoService) {
893             ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
894         }
895     }
896 
897     /**
898      * Increase the number of read messages
899      * 
900      * @param currentTime The current time
901      */
902     public final void increaseReadMessages(long currentTime) {
903         readMessages++;
904         lastReadTime = currentTime;
905         idleCountForBoth.set(0);
906         idleCountForRead.set(0);
907 
908         if (getService() instanceof AbstractIoService) {
909             ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
910         }
911     }
912 
913     /**
914      * Increase the number of written bytes
915      * 
916      * @param increment The number of written bytes
917      * @param currentTime The current time
918      */
919     public final void increaseWrittenBytes(int increment, long currentTime) {
920         if (increment <= 0) {
921             return;
922         }
923 
924         writtenBytes += increment;
925         lastWriteTime = currentTime;
926         idleCountForBoth.set(0);
927         idleCountForWrite.set(0);
928 
929         if (getService() instanceof AbstractIoService) {
930             ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
931         }
932 
933         increaseScheduledWriteBytes(-increment);
934     }
935 
936     /**
937      * Increase the number of written messages
938      * 
939      * @param request The written message
940      * @param currentTime The current tile
941      */
942     public final void increaseWrittenMessages(WriteRequest request, long currentTime) {
943         Object message = request.getMessage();
944 
945         if (message instanceof IoBuffer) {
946             IoBuffer b = (IoBuffer) message;
947 
948             if (b.hasRemaining()) {
949                 return;
950             }
951         }
952 
953         writtenMessages++;
954         lastWriteTime = currentTime;
955 
956         if (getService() instanceof AbstractIoService) {
957             ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
958         }
959 
960         decreaseScheduledWriteMessages();
961     }
962 
963     /**
964      * Increase the number of scheduled write bytes for the session
965      * 
966      * @param increment The number of newly added bytes to write
967      */
968     public final void increaseScheduledWriteBytes(int increment) {
969         scheduledWriteBytes.addAndGet(increment);
970         if (getService() instanceof AbstractIoService) {
971             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
972         }
973     }
974 
975     /**
976      * Increase the number of scheduled message to write
977      */
978     public final void increaseScheduledWriteMessages() {
979         scheduledWriteMessages.incrementAndGet();
980         
981         if (getService() instanceof AbstractIoService) {
982             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
983         }
984     }
985 
986     /**
987      * Decrease the number of scheduled message written
988      */
989     private void decreaseScheduledWriteMessages() {
990         scheduledWriteMessages.decrementAndGet();
991         if (getService() instanceof AbstractIoService) {
992             ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
993         }
994     }
995 
996     /**
997      * Decrease the counters of written messages and written bytes when a message has been written
998      * 
999      * @param request The written message
1000      */
1001     public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
1002         Object message = request.getMessage();
1003         
1004         if (message instanceof IoBuffer) {
1005             IoBuffer b = (IoBuffer) message;
1006             
1007             if (b.hasRemaining()) {
1008                 increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
1009             } else {
1010                 decreaseScheduledWriteMessages();
1011             }
1012         } else {
1013             decreaseScheduledWriteMessages();
1014         }
1015     }
1016 
1017     /**
1018      * {@inheritDoc}
1019      */
1020     public final WriteRequestQueue getWriteRequestQueue() {
1021         if (writeRequestQueue == null) {
1022             throw new IllegalStateException();
1023         }
1024         
1025         return writeRequestQueue;
1026     }
1027 
1028     /**
1029      * {@inheritDoc}
1030      */
1031     public final WriteRequest getCurrentWriteRequest() {
1032         return currentWriteRequest;
1033     }
1034 
1035     /**
1036      * {@inheritDoc}
1037      */
1038     public final Object getCurrentWriteMessage() {
1039         WriteRequest req = getCurrentWriteRequest();
1040         
1041         if (req == null) {
1042             return null;
1043         }
1044         return req.getMessage();
1045     }
1046 
1047     /**
1048      * {@inheritDoc}
1049      */
1050     public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
1051         this.currentWriteRequest = currentWriteRequest;
1052     }
1053 
1054     /**
1055      * Increase the ReadBuffer size (it will double)
1056      */
1057     public final void increaseReadBufferSize() {
1058         int newReadBufferSize = getConfig().getReadBufferSize() << 1;
1059         if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
1060             getConfig().setReadBufferSize(newReadBufferSize);
1061         } else {
1062             getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
1063         }
1064 
1065         deferDecreaseReadBuffer = true;
1066     }
1067 
1068     /**
1069      * Decrease the ReadBuffer size (it will be divided by a factor 2)
1070      */
1071     public final void decreaseReadBufferSize() {
1072         if (deferDecreaseReadBuffer) {
1073             deferDecreaseReadBuffer = false;
1074             return;
1075         }
1076 
1077         if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
1078             getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
1079         }
1080 
1081         deferDecreaseReadBuffer = true;
1082     }
1083 
1084     /**
1085      * {@inheritDoc}
1086      */
1087     public final long getCreationTime() {
1088         return creationTime;
1089     }
1090 
1091     /**
1092      * {@inheritDoc}
1093      */
1094     public final long getLastIoTime() {
1095         return Math.max(lastReadTime, lastWriteTime);
1096     }
1097 
1098     /**
1099      * {@inheritDoc}
1100      */
1101     public final long getLastReadTime() {
1102         return lastReadTime;
1103     }
1104 
1105     /**
1106      * {@inheritDoc}
1107      */
1108     public final long getLastWriteTime() {
1109         return lastWriteTime;
1110     }
1111 
1112     /**
1113      * {@inheritDoc}
1114      */
1115     public final boolean isIdle(IdleStatus status) {
1116         if (status == IdleStatus.BOTH_IDLE) {
1117             return idleCountForBoth.get() > 0;
1118         }
1119 
1120         if (status == IdleStatus.READER_IDLE) {
1121             return idleCountForRead.get() > 0;
1122         }
1123 
1124         if (status == IdleStatus.WRITER_IDLE) {
1125             return idleCountForWrite.get() > 0;
1126         }
1127 
1128         throw new IllegalArgumentException("Unknown idle status: " + status);
1129     }
1130 
1131     /**
1132      * {@inheritDoc}
1133      */
1134     public final boolean isBothIdle() {
1135         return isIdle(IdleStatus.BOTH_IDLE);
1136     }
1137 
1138     /**
1139      * {@inheritDoc}
1140      */
1141     public final boolean isReaderIdle() {
1142         return isIdle(IdleStatus.READER_IDLE);
1143     }
1144 
1145     /**
1146      * {@inheritDoc}
1147      */
1148     public final boolean isWriterIdle() {
1149         return isIdle(IdleStatus.WRITER_IDLE);
1150     }
1151 
1152     /**
1153      * {@inheritDoc}
1154      */
1155     public final int getIdleCount(IdleStatus status) {
1156         if (getConfig().getIdleTime(status) == 0) {
1157             if (status == IdleStatus.BOTH_IDLE) {
1158                 idleCountForBoth.set(0);
1159             }
1160 
1161             if (status == IdleStatus.READER_IDLE) {
1162                 idleCountForRead.set(0);
1163             }
1164 
1165             if (status == IdleStatus.WRITER_IDLE) {
1166                 idleCountForWrite.set(0);
1167             }
1168         }
1169 
1170         if (status == IdleStatus.BOTH_IDLE) {
1171             return idleCountForBoth.get();
1172         }
1173 
1174         if (status == IdleStatus.READER_IDLE) {
1175             return idleCountForRead.get();
1176         }
1177 
1178         if (status == IdleStatus.WRITER_IDLE) {
1179             return idleCountForWrite.get();
1180         }
1181 
1182         throw new IllegalArgumentException("Unknown idle status: " + status);
1183     }
1184 
1185     /**
1186      * {@inheritDoc}
1187      */
1188     public final long getLastIdleTime(IdleStatus status) {
1189         if (status == IdleStatus.BOTH_IDLE) {
1190             return lastIdleTimeForBoth;
1191         }
1192 
1193         if (status == IdleStatus.READER_IDLE) {
1194             return lastIdleTimeForRead;
1195         }
1196 
1197         if (status == IdleStatus.WRITER_IDLE) {
1198             return lastIdleTimeForWrite;
1199         }
1200 
1201         throw new IllegalArgumentException("Unknown idle status: " + status);
1202     }
1203 
1204     /**
1205      * Increase the count of the various Idle counter
1206      * 
1207      * @param status The current status
1208      * @param currentTime The current time
1209      */
1210     public final void increaseIdleCount(IdleStatus status, long currentTime) {
1211         if (status == IdleStatus.BOTH_IDLE) {
1212             idleCountForBoth.incrementAndGet();
1213             lastIdleTimeForBoth = currentTime;
1214         } else if (status == IdleStatus.READER_IDLE) {
1215             idleCountForRead.incrementAndGet();
1216             lastIdleTimeForRead = currentTime;
1217         } else if (status == IdleStatus.WRITER_IDLE) {
1218             idleCountForWrite.incrementAndGet();
1219             lastIdleTimeForWrite = currentTime;
1220         } else {
1221             throw new IllegalArgumentException("Unknown idle status: " + status);
1222         }
1223     }
1224 
1225     /**
1226      * {@inheritDoc}
1227      */
1228     public final int getBothIdleCount() {
1229         return getIdleCount(IdleStatus.BOTH_IDLE);
1230     }
1231 
1232     /**
1233      * {@inheritDoc}
1234      */
1235     public final long getLastBothIdleTime() {
1236         return getLastIdleTime(IdleStatus.BOTH_IDLE);
1237     }
1238 
1239     /**
1240      * {@inheritDoc}
1241      */
1242     public final long getLastReaderIdleTime() {
1243         return getLastIdleTime(IdleStatus.READER_IDLE);
1244     }
1245 
1246     /**
1247      * {@inheritDoc}
1248      */
1249     public final long getLastWriterIdleTime() {
1250         return getLastIdleTime(IdleStatus.WRITER_IDLE);
1251     }
1252 
1253     /**
1254      * {@inheritDoc}
1255      */
1256     public final int getReaderIdleCount() {
1257         return getIdleCount(IdleStatus.READER_IDLE);
1258     }
1259 
1260     /**
1261      * {@inheritDoc}
1262      */
1263     public final int getWriterIdleCount() {
1264         return getIdleCount(IdleStatus.WRITER_IDLE);
1265     }
1266 
1267     /**
1268      * {@inheritDoc}
1269      */
1270     public SocketAddress getServiceAddress() {
1271         IoService service = getService();
1272         if (service instanceof IoAcceptor) {
1273             return ((IoAcceptor) service).getLocalAddress();
1274         }
1275 
1276         return getRemoteAddress();
1277     }
1278 
1279     /**
1280      * {@inheritDoc}
1281      */
1282     @Override
1283     public final int hashCode() {
1284         return super.hashCode();
1285     }
1286 
1287     /**
1288      * {@inheritDoc} TODO This is a ridiculous implementation. Need to be
1289      * replaced.
1290      */
1291     @Override
1292     public final boolean equals(Object o) {
1293         return super.equals(o);
1294     }
1295 
1296     /**
1297      * {@inheritDoc}
1298      */
1299     @Override
1300     public String toString() {
1301         if (isConnected() || isClosing()) {
1302             String remote = null;
1303             String local = null;
1304 
1305             try {
1306                 remote = String.valueOf(getRemoteAddress());
1307             } catch (Exception e) {
1308                 remote = "Cannot get the remote address informations: " + e.getMessage();
1309             }
1310 
1311             try {
1312                 local = String.valueOf(getLocalAddress());
1313             } catch (Exception e) {
1314             }
1315 
1316             if (getService() instanceof IoAcceptor) {
1317                 return "(" + getIdAsString() + ": " + getServiceName() + ", server, " + remote + " => " + local + ')';
1318             }
1319 
1320             return "(" + getIdAsString() + ": " + getServiceName() + ", client, " + local + " => " + remote + ')';
1321         }
1322 
1323         return "(" + getIdAsString() + ") Session disconnected ...";
1324     }
1325 
1326     /**
1327      * Get the Id as a String
1328      */
1329     private String getIdAsString() {
1330         String id = Long.toHexString(getId()).toUpperCase();
1331         
1332         if (id.length() <= 8) {
1333             return "0x00000000".substring(0, 10 - id.length()) + id;
1334         } else {
1335             return "0x" + id;
1336         }
1337     }
1338 
1339     /**
1340      * TGet the Service name
1341      */
1342     private String getServiceName() {
1343         TransportMetadata tm = getTransportMetadata();
1344         if (tm == null) {
1345             return "null";
1346         }
1347 
1348         return tm.getProviderName() + ' ' + tm.getName();
1349     }
1350 
1351     /**
1352      * {@inheritDoc}
1353      */
1354     public IoService getService() {
1355         return service;
1356     }
1357 
1358     /**
1359      * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable sessions
1360      * in the specified collection.
1361      * 
1362      * @param sessions The sessions that are notified
1363      * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1364      */
1365     public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
1366         while (sessions.hasNext()) {
1367             IoSession session = sessions.next();
1368             
1369             if (!session.getCloseFuture().isClosed()) {
1370                 notifyIdleSession(session, currentTime);
1371             }
1372         }
1373     }
1374 
1375     /**
1376      * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the
1377      * specified {@code session}.
1378      * 
1379      * @param session The session that is notified
1380      * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1381      */
1382     public static void notifyIdleSession(IoSession session, long currentTime) {
1383         notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
1384                 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
1385 
1386         notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
1387                 IdleStatus.READER_IDLE,
1388                 Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
1389 
1390         notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
1391                 IdleStatus.WRITER_IDLE,
1392                 Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
1393 
1394         notifyWriteTimeout(session, currentTime);
1395     }
1396 
1397     private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status,
1398             long lastIoTime) {
1399         if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) {
1400             session.getFilterChain().fireSessionIdle(status);
1401         }
1402     }
1403 
1404     private static void notifyWriteTimeout(IoSession session, long currentTime) {
1405 
1406         long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
1407         if ((writeTimeout > 0) && (currentTime - session.getLastWriteTime() >= writeTimeout)
1408                 && !session.getWriteRequestQueue().isEmpty(session)) {
1409             WriteRequest request = session.getCurrentWriteRequest();
1410             if (request != null) {
1411                 session.setCurrentWriteRequest(null);
1412                 WriteTimeoutException cause = new WriteTimeoutException(request);
1413                 request.getFuture().setException(cause);
1414                 session.getFilterChain().fireExceptionCaught(cause);
1415                 // WriteException is an IOException, so we close the session.
1416                 session.closeNow();
1417             }
1418         }
1419     }
1420 }