View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.session;
21  
22  import java.io.File;
23  import java.io.FileInputStream;
24  import java.io.IOException;
25  import java.net.SocketAddress;
26  import java.nio.channels.FileChannel;
27  import java.util.Iterator;
28  import java.util.Queue;
29  import java.util.Set;
30  import java.util.concurrent.ConcurrentLinkedQueue;
31  import java.util.concurrent.atomic.AtomicBoolean;
32  import java.util.concurrent.atomic.AtomicInteger;
33  import java.util.concurrent.atomic.AtomicLong;
34  
35  import org.apache.mina.core.buffer.IoBuffer;
36  import org.apache.mina.core.file.DefaultFileRegion;
37  import org.apache.mina.core.file.FilenameFileRegion;
38  import org.apache.mina.core.filterchain.IoFilterChain;
39  import org.apache.mina.core.future.CloseFuture;
40  import org.apache.mina.core.future.DefaultCloseFuture;
41  import org.apache.mina.core.future.DefaultReadFuture;
42  import org.apache.mina.core.future.DefaultWriteFuture;
43  import org.apache.mina.core.future.IoFutureListener;
44  import org.apache.mina.core.future.ReadFuture;
45  import org.apache.mina.core.future.WriteFuture;
46  import org.apache.mina.core.service.AbstractIoService;
47  import org.apache.mina.core.service.IoAcceptor;
48  import org.apache.mina.core.service.IoHandler;
49  import org.apache.mina.core.service.IoProcessor;
50  import org.apache.mina.core.service.IoService;
51  import org.apache.mina.core.service.TransportMetadata;
52  import org.apache.mina.core.write.DefaultWriteRequest;
53  import org.apache.mina.core.write.WriteException;
54  import org.apache.mina.core.write.WriteRequest;
55  import org.apache.mina.core.write.WriteRequestQueue;
56  import org.apache.mina.core.write.WriteTimeoutException;
57  import org.apache.mina.core.write.WriteToClosedSessionException;
58  import org.apache.mina.util.ExceptionMonitor;
59  
60  /**
61   * Base implementation of {@link IoSession}.
62   * 
63   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
64   */
65  public abstract class AbstractIoSession implements IoSession {
66      /** The associated handler */
67      private final IoHandler handler;
68  
69      /** The session config */
70      protected IoSessionConfig config;
71  
72      /** The service which will manage this session */
73      private final IoService service;
74  
75      private static final AttributeKey READY_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
76              "readyReadFutures");
77  
78      private static final AttributeKey WAITING_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
79              "waitingReadFutures");
80  
81      private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() {
82          public void operationComplete(CloseFuture future) {
83              AbstractIoSession session = (AbstractIoSession) future.getSession();
84              session.scheduledWriteBytes.set(0);
85              session.scheduledWriteMessages.set(0);
86              session.readBytesThroughput = 0;
87              session.readMessagesThroughput = 0;
88              session.writtenBytesThroughput = 0;
89              session.writtenMessagesThroughput = 0;
90          }
91      };
92  
93      /**
94       * An internal write request object that triggers session close.
95       * 
96       * @see #writeRequestQueue
97       */
98      public static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object());
99  
100     /**
101      * An internal write request object that triggers message sent events.
102      * 
103      * @see #writeRequestQueue
104      */
105     public static final WriteRequest MESSAGE_SENT_REQUEST = new DefaultWriteRequest(DefaultWriteRequest.EMPTY_MESSAGE);
106 
107     private final Object lock = new Object();
108 
109     private IoSessionAttributeMap attributes;
110 
111     private WriteRequestQueue writeRequestQueue;
112 
113     private WriteRequest currentWriteRequest;
114 
115     /** The Session creation's time */
116     private final long creationTime;
117 
118     /** An id generator guaranteed to generate unique IDs for the session */
119     private static AtomicLong idGenerator = new AtomicLong(0);
120 
121     /** The session ID */
122     private long sessionId;
123 
124     /**
125      * A future that will be set 'closed' when the connection is closed.
126      */
127     private final CloseFuture closeFuture = new DefaultCloseFuture(this);
128 
129     private volatile boolean closing;
130 
131     // traffic control
132     private boolean readSuspended = false;
133 
134     private boolean writeSuspended = false;
135 
136     // Status variables
137     private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
138 
139     private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
140 
141     private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
142 
143     private long readBytes;
144 
145     private long writtenBytes;
146 
147     private long readMessages;
148 
149     private long writtenMessages;
150 
151     private long lastReadTime;
152 
153     private long lastWriteTime;
154 
155     private long lastThroughputCalculationTime;
156 
157     private long lastReadBytes;
158 
159     private long lastWrittenBytes;
160 
161     private long lastReadMessages;
162 
163     private long lastWrittenMessages;
164 
165     private double readBytesThroughput;
166 
167     private double writtenBytesThroughput;
168 
169     private double readMessagesThroughput;
170 
171     private double writtenMessagesThroughput;
172 
173     private AtomicInteger idleCountForBoth = new AtomicInteger();
174 
175     private AtomicInteger idleCountForRead = new AtomicInteger();
176 
177     private AtomicInteger idleCountForWrite = new AtomicInteger();
178 
179     private long lastIdleTimeForBoth;
180 
181     private long lastIdleTimeForRead;
182 
183     private long lastIdleTimeForWrite;
184 
185     private boolean deferDecreaseReadBuffer = true;
186 
187     /**
188      * Create a Session for a service
189      * 
190      * @param service the Service for this session
191      */
192     protected AbstractIoSession(IoService service) {
193         this.service = service;
194         this.handler = service.getHandler();
195 
196         // Initialize all the Session counters to the current time
197         long currentTime = System.currentTimeMillis();
198         creationTime = currentTime;
199         lastThroughputCalculationTime = currentTime;
200         lastReadTime = currentTime;
201         lastWriteTime = currentTime;
202         lastIdleTimeForBoth = currentTime;
203         lastIdleTimeForRead = currentTime;
204         lastIdleTimeForWrite = currentTime;
205 
206         // TODO add documentation
207         closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
208 
209         // Set a new ID for this session
210         sessionId = idGenerator.incrementAndGet();
211     }
212 
213     /**
214      * {@inheritDoc}
215      * 
216      * We use an AtomicLong to guarantee that the session ID are unique.
217      */
218     public final long getId() {
219         return sessionId;
220     }
221 
222     /**
223      * @return The associated IoProcessor for this session
224      */
225     public abstract IoProcessor getProcessor();
226 
227     /**
228      * {@inheritDoc}
229      */
230     public final boolean isConnected() {
231         return !closeFuture.isClosed();
232     }
233 
234     /**
235      * {@inheritDoc}
236      */
237     public boolean isActive() {
238         // Return true by default
239         return true;
240     }
241 
242     /**
243      * {@inheritDoc}
244      */
245     public final boolean isClosing() {
246         return closing || closeFuture.isClosed();
247     }
248 
249     /**
250      * {@inheritDoc}
251      */
252     public boolean isSecured() {
253         // Always false...
254         return false;
255     }
256 
257     /**
258      * {@inheritDoc}
259      */
260     public final CloseFuture getCloseFuture() {
261         return closeFuture;
262     }
263 
264     /**
265      * Tells if the session is scheduled for flushed
266      * 
267      * @return true if the session is scheduled for flush
268      */
269     public final boolean isScheduledForFlush() {
270         return scheduledForFlush.get();
271     }
272 
273     /**
274      * Schedule the session for flushed
275      */
276     public final void scheduledForFlush() {
277         scheduledForFlush.set(true);
278     }
279 
280     /**
281      * Change the session's status : it's not anymore scheduled for flush
282      */
283     public final void unscheduledForFlush() {
284         scheduledForFlush.set(false);
285     }
286 
287     /**
288      * Set the scheduledForFLush flag. As we may have concurrent access to this
289      * flag, we compare and set it in one call.
290      * 
291      * @param schedule
292      *            the new value to set if not already set.
293      * @return true if the session flag has been set, and if it wasn't set
294      *         already.
295      */
296     public final boolean setScheduledForFlush(boolean schedule) {
297         if (schedule) {
298             // If the current tag is set to false, switch it to true,
299             // otherwise, we do nothing but return false : the session
300             // is already scheduled for flush
301             return scheduledForFlush.compareAndSet(false, schedule);
302         }
303 
304         scheduledForFlush.set(schedule);
305         return true;
306     }
307 
308     /**
309      * {@inheritDoc}
310      */
311     public final CloseFuture close(boolean rightNow) {
312         if (rightNow) {
313             return closeNow();
314         } else {
315             return closeOnFlush();
316         }
317     }
318 
319     /**
320      * {@inheritDoc}
321      */
322     public final CloseFuture close() {
323         return closeNow();
324     }
325 
326     /**
327      * {@inheritDoc}
328      */
329     public final CloseFuture closeOnFlush() {
330         if (!isClosing()) {
331             getWriteRequestQueue().offer(this, CLOSE_REQUEST);
332             getProcessor().flush(this);
333         }
334         
335         return closeFuture;
336     }
337 
338     /**
339      * {@inheritDoc}
340      */
341     public final CloseFuture closeNow() {
342         synchronized (lock) {
343             if (isClosing()) {
344                 return closeFuture;
345             }
346 
347             closing = true;
348             
349             try {
350                 destroy();
351             } catch (Exception e) {
352                 IoFilterChain filterChain = getFilterChain();
353                 filterChain.fireExceptionCaught(e);
354             }
355         }
356 
357         getFilterChain().fireFilterClose();
358 
359         return closeFuture;
360     }
361     
362     /**
363      * Destroy the session
364      */
365     protected void destroy() {
366         if (writeRequestQueue != null) {
367             while (!writeRequestQueue.isEmpty(this)) {
368                 WriteRequest writeRequest = writeRequestQueue.poll(this);
369                 
370                 if (writeRequest != null) {
371                     WriteFuture writeFuture = writeRequest.getFuture();
372                     
373                     // The WriteRequest may not always have a future : The CLOSE_REQUEST
374                     // and MESSAGE_SENT_REQUEST don't.
375                     if (writeFuture != null) {
376                         writeFuture.setWritten();
377                     }
378                 }
379             }
380         }
381     }
382 
383     /**
384      * {@inheritDoc}
385      */
386     public IoHandler getHandler() {
387         return handler;
388     }
389 
390     /**
391      * {@inheritDoc}
392      */
393     public IoSessionConfig getConfig() {
394         return config;
395     }
396 
397     /**
398      * {@inheritDoc}
399      */
400     public final ReadFuture read() {
401         if (!getConfig().isUseReadOperation()) {
402             throw new IllegalStateException("useReadOperation is not enabled.");
403         }
404 
405         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
406         ReadFuture future;
407         
408         synchronized (readyReadFutures) {
409             future = readyReadFutures.poll();
410             
411             if (future != null) {
412                 if (future.isClosed()) {
413                     // Let other readers get notified.
414                     readyReadFutures.offer(future);
415                 }
416             } else {
417                 future = new DefaultReadFuture(this);
418                 getWaitingReadFutures().offer(future);
419             }
420         }
421 
422         return future;
423     }
424 
425     /**
426      * Associates a message to a ReadFuture
427      * 
428      * @param message the message to associate to the ReadFuture
429      * 
430      */
431     public final void offerReadFuture(Object message) {
432         newReadFuture().setRead(message);
433     }
434 
435     /**
436      * Associates a failure to a ReadFuture
437      * 
438      * @param exception the exception to associate to the ReadFuture
439      */
440     public final void offerFailedReadFuture(Throwable exception) {
441         newReadFuture().setException(exception);
442     }
443 
444     /**
445      * Inform the ReadFuture that the session has been closed
446      */
447     public final void offerClosedReadFuture() {
448         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
449         
450         synchronized (readyReadFutures) {
451             newReadFuture().setClosed();
452         }
453     }
454 
455     /**
456      * @return a readFuture get from the waiting ReadFuture
457      */
458     private ReadFuture newReadFuture() {
459         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
460         Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
461         ReadFuture future;
462         
463         synchronized (readyReadFutures) {
464             future = waitingReadFutures.poll();
465             
466             if (future == null) {
467                 future = new DefaultReadFuture(this);
468                 readyReadFutures.offer(future);
469             }
470         }
471         
472         return future;
473     }
474 
475     /**
476      * @return a queue of ReadFuture
477      */
478     private Queue<ReadFuture> getReadyReadFutures() {
479         Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
480         
481         if (readyReadFutures == null) {
482             readyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
483 
484             Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY,
485                     readyReadFutures);
486             
487             if (oldReadyReadFutures != null) {
488                 readyReadFutures = oldReadyReadFutures;
489             }
490         }
491         
492         return readyReadFutures;
493     }
494 
495     /**
496      * @return the queue of waiting ReadFuture
497      */
498     private Queue<ReadFuture> getWaitingReadFutures() {
499         Queue<ReadFuture> waitingReadyReadFutures = (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
500         
501         if (waitingReadyReadFutures == null) {
502             waitingReadyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
503 
504             Queue<ReadFuture> oldWaitingReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(
505                     WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
506             
507             if (oldWaitingReadyReadFutures != null) {
508                 waitingReadyReadFutures = oldWaitingReadyReadFutures;
509             }
510         }
511         
512         return waitingReadyReadFutures;
513     }
514 
515     /**
516      * {@inheritDoc}
517      */
518     public WriteFuture write(Object message) {
519         return write(message, null);
520     }
521 
522     /**
523      * {@inheritDoc}
524      */
525     public WriteFuture write(Object message, SocketAddress remoteAddress) {
526         if (message == null) {
527             throw new IllegalArgumentException("Trying to write a null message : not allowed");
528         }
529 
530         // We can't send a message to a connected session if we don't have
531         // the remote address
532         if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) {
533             throw new UnsupportedOperationException();
534         }
535 
536         // If the session has been closed or is closing, we can't either
537         // send a message to the remote side. We generate a future
538         // containing an exception.
539         if (isClosing() || !isConnected()) {
540             WriteFuture future = new DefaultWriteFuture(this);
541             WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
542             WriteException writeException = new WriteToClosedSessionException(request);
543             future.setException(writeException);
544             return future;
545         }
546 
547         FileChannel openedFileChannel = null;
548 
549         // TODO: remove this code as soon as we use InputStream
550         // instead of Object for the message.
551         try {
552             if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) {
553                 // Nothing to write : probably an error in the user code
554                 throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
555             } else if (message instanceof FileChannel) {
556                 FileChannel fileChannel = (FileChannel) message;
557                 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
558             } else if (message instanceof File) {
559                 File file = (File) message;
560                 openedFileChannel = new FileInputStream(file).getChannel();
561                 message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size());
562             }
563         } catch (IOException e) {
564             ExceptionMonitor.getInstance().exceptionCaught(e);
565             return DefaultWriteFuture.newNotWrittenFuture(this, e);
566         }
567 
568         // Now, we can write the message. First, create a future
569         WriteFuture writeFuture = new DefaultWriteFuture(this);
570         WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
571 
572         // Then, get the chain and inject the WriteRequest into it
573         IoFilterChain filterChain = getFilterChain();
574         filterChain.fireFilterWrite(writeRequest);
575 
576         // TODO : This is not our business ! The caller has created a
577         // FileChannel,
578         // he has to close it !
579         if (openedFileChannel != null) {
580             // If we opened a FileChannel, it needs to be closed when the write
581             // has completed
582             final FileChannel finalChannel = openedFileChannel;
583             writeFuture.addListener(new IoFutureListener<WriteFuture>() {
584                 public void operationComplete(WriteFuture future) {
585                     try {
586                         finalChannel.close();
587                     } catch (IOException e) {
588                         ExceptionMonitor.getInstance().exceptionCaught(e);
589                     }
590                 }
591             });
592         }
593 
594         // Return the WriteFuture.
595         return writeFuture;
596     }
597 
598     /**
599      * {@inheritDoc}
600      */
601     public final Object getAttachment() {
602         return getAttribute("");
603     }
604 
605     /**
606      * {@inheritDoc}
607      */
608     public final Object setAttachment(Object attachment) {
609         return setAttribute("", attachment);
610     }
611 
612     /**
613      * {@inheritDoc}
614      */
615     public final Object getAttribute(Object key) {
616         return getAttribute(key, null);
617     }
618 
619     /**
620      * {@inheritDoc}
621      */
622     public final Object getAttribute(Object key, Object defaultValue) {
623         return attributes.getAttribute(this, key, defaultValue);
624     }
625 
626     /**
627      * {@inheritDoc}
628      */
629     public final Object setAttribute(Object key, Object value) {
630         return attributes.setAttribute(this, key, value);
631     }
632 
633     /**
634      * {@inheritDoc}
635      */
636     public final Object setAttribute(Object key) {
637         return setAttribute(key, Boolean.TRUE);
638     }
639 
640     /**
641      * {@inheritDoc}
642      */
643     public final Object setAttributeIfAbsent(Object key, Object value) {
644         return attributes.setAttributeIfAbsent(this, key, value);
645     }
646 
647     /**
648      * {@inheritDoc}
649      */
650     public final Object setAttributeIfAbsent(Object key) {
651         return setAttributeIfAbsent(key, Boolean.TRUE);
652     }
653 
654     /**
655      * {@inheritDoc}
656      */
657     public final Object removeAttribute(Object key) {
658         return attributes.removeAttribute(this, key);
659     }
660 
661     /**
662      * {@inheritDoc}
663      */
664     public final boolean removeAttribute(Object key, Object value) {
665         return attributes.removeAttribute(this, key, value);
666     }
667 
668     /**
669      * {@inheritDoc}
670      */
671     public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
672         return attributes.replaceAttribute(this, key, oldValue, newValue);
673     }
674 
675     /**
676      * {@inheritDoc}
677      */
678     public final boolean containsAttribute(Object key) {
679         return attributes.containsAttribute(this, key);
680     }
681 
682     /**
683      * {@inheritDoc}
684      */
685     public final Set<Object> getAttributeKeys() {
686         return attributes.getAttributeKeys(this);
687     }
688 
689     /**
690      * @return The map of attributes associated with the session
691      */
692     public final IoSessionAttributeMap getAttributeMap() {
693         return attributes;
694     }
695 
696     /**
697      * Set the map of attributes associated with the session
698      * 
699      * @param attributes The Map of attributes
700      */
701     public final void setAttributeMap(IoSessionAttributeMap attributes) {
702         this.attributes = attributes;
703     }
704 
705     /**
706      * Create a new close aware write queue, based on the given write queue.
707      * 
708      * @param writeRequestQueue The write request queue
709      */
710     public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
711         this.writeRequestQueue = writeRequestQueue;
712     }
713 
714     /**
715      * {@inheritDoc}
716      */
717     public final void suspendRead() {
718         readSuspended = true;
719         if (isClosing() || !isConnected()) {
720             return;
721         }
722         getProcessor().updateTrafficControl(this);
723     }
724 
725     /**
726      * {@inheritDoc}
727      */
728     public final void suspendWrite() {
729         writeSuspended = true;
730         if (isClosing() || !isConnected()) {
731             return;
732         }
733         getProcessor().updateTrafficControl(this);
734     }
735 
736     /**
737      * {@inheritDoc}
738      */
739     @SuppressWarnings("unchecked")
740     public final void resumeRead() {
741         readSuspended = false;
742         if (isClosing() || !isConnected()) {
743             return;
744         }
745         getProcessor().updateTrafficControl(this);
746     }
747 
748     /**
749      * {@inheritDoc}
750      */
751     @SuppressWarnings("unchecked")
752     public final void resumeWrite() {
753         writeSuspended = false;
754         if (isClosing() || !isConnected()) {
755             return;
756         }
757         getProcessor().updateTrafficControl(this);
758     }
759 
760     /**
761      * {@inheritDoc}
762      */
763     public boolean isReadSuspended() {
764         return readSuspended;
765     }
766 
767     /**
768      * {@inheritDoc}
769      */
770     public boolean isWriteSuspended() {
771         return writeSuspended;
772     }
773 
774     /**
775      * {@inheritDoc}
776      */
777     public final long getReadBytes() {
778         return readBytes;
779     }
780 
781     /**
782      * {@inheritDoc}
783      */
784     public final long getWrittenBytes() {
785         return writtenBytes;
786     }
787 
788     /**
789      * {@inheritDoc}
790      */
791     public final long getReadMessages() {
792         return readMessages;
793     }
794 
795     /**
796      * {@inheritDoc}
797      */
798     public final long getWrittenMessages() {
799         return writtenMessages;
800     }
801 
802     /**
803      * {@inheritDoc}
804      */
805     public final double getReadBytesThroughput() {
806         return readBytesThroughput;
807     }
808 
809     /**
810      * {@inheritDoc}
811      */
812     public final double getWrittenBytesThroughput() {
813         return writtenBytesThroughput;
814     }
815 
816     /**
817      * {@inheritDoc}
818      */
819     public final double getReadMessagesThroughput() {
820         return readMessagesThroughput;
821     }
822 
823     /**
824      * {@inheritDoc}
825      */
826     public final double getWrittenMessagesThroughput() {
827         return writtenMessagesThroughput;
828     }
829 
830     /**
831      * {@inheritDoc}
832      */
833     public final void updateThroughput(long currentTime, boolean force) {
834         int interval = (int) (currentTime - lastThroughputCalculationTime);
835 
836         long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
837 
838         if (((minInterval == 0) || (interval < minInterval)) && !force) {
839             return;
840         }
841 
842         readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
843         writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
844         readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
845         writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
846 
847         lastReadBytes = readBytes;
848         lastWrittenBytes = writtenBytes;
849         lastReadMessages = readMessages;
850         lastWrittenMessages = writtenMessages;
851 
852         lastThroughputCalculationTime = currentTime;
853     }
854 
855     /**
856      * {@inheritDoc}
857      */
858     public final long getScheduledWriteBytes() {
859         return scheduledWriteBytes.get();
860     }
861 
862     /**
863      * {@inheritDoc}
864      */
865     public final int getScheduledWriteMessages() {
866         return scheduledWriteMessages.get();
867     }
868 
869     /**
870      * Set the number of scheduled write bytes
871      * 
872      * @param byteCount The number of scheduled bytes for write
873      */
874     protected void setScheduledWriteBytes(int byteCount) {
875         scheduledWriteBytes.set(byteCount);
876     }
877 
878     /**
879      * Set the number of scheduled write messages
880      * 
881      * @param messages The number of scheduled messages for write
882      */
883     protected void setScheduledWriteMessages(int messages) {
884         scheduledWriteMessages.set(messages);
885     }
886 
887     /**
888      * Increase the number of read bytes
889      * 
890      * @param increment The number of read bytes
891      * @param currentTime The current time
892      */
893     public final void increaseReadBytes(long increment, long currentTime) {
894         if (increment <= 0) {
895             return;
896         }
897 
898         readBytes += increment;
899         lastReadTime = currentTime;
900         idleCountForBoth.set(0);
901         idleCountForRead.set(0);
902 
903         if (getService() instanceof AbstractIoService) {
904             ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
905         }
906     }
907 
908     /**
909      * Increase the number of read messages
910      * 
911      * @param currentTime The current time
912      */
913     public final void increaseReadMessages(long currentTime) {
914         readMessages++;
915         lastReadTime = currentTime;
916         idleCountForBoth.set(0);
917         idleCountForRead.set(0);
918 
919         if (getService() instanceof AbstractIoService) {
920             ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
921         }
922     }
923 
924     /**
925      * Increase the number of written bytes
926      * 
927      * @param increment The number of written bytes
928      * @param currentTime The current time
929      */
930     public final void increaseWrittenBytes(int increment, long currentTime) {
931         if (increment <= 0) {
932             return;
933         }
934 
935         writtenBytes += increment;
936         lastWriteTime = currentTime;
937         idleCountForBoth.set(0);
938         idleCountForWrite.set(0);
939 
940         if (getService() instanceof AbstractIoService) {
941             ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
942         }
943 
944         increaseScheduledWriteBytes(-increment);
945     }
946 
947     /**
948      * Increase the number of written messages
949      * 
950      * @param request The written message
951      * @param currentTime The current tile
952      */
953     public final void increaseWrittenMessages(WriteRequest request, long currentTime) {
954         Object message = request.getMessage();
955 
956         if (message instanceof IoBuffer) {
957             IoBuffer b = (IoBuffer) message;
958 
959             if (b.hasRemaining()) {
960                 return;
961             }
962         }
963 
964         writtenMessages++;
965         lastWriteTime = currentTime;
966 
967         if (getService() instanceof AbstractIoService) {
968             ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
969         }
970 
971         decreaseScheduledWriteMessages();
972     }
973 
974     /**
975      * Increase the number of scheduled write bytes for the session
976      * 
977      * @param increment The number of newly added bytes to write
978      */
979     public final void increaseScheduledWriteBytes(int increment) {
980         scheduledWriteBytes.addAndGet(increment);
981         if (getService() instanceof AbstractIoService) {
982             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
983         }
984     }
985 
986     /**
987      * Increase the number of scheduled message to write
988      */
989     public final void increaseScheduledWriteMessages() {
990         scheduledWriteMessages.incrementAndGet();
991         
992         if (getService() instanceof AbstractIoService) {
993             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
994         }
995     }
996 
997     /**
998      * Decrease the number of scheduled message written
999      */
1000     private void decreaseScheduledWriteMessages() {
1001         scheduledWriteMessages.decrementAndGet();
1002         if (getService() instanceof AbstractIoService) {
1003             ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
1004         }
1005     }
1006 
1007     /**
1008      * Decrease the counters of written messages and written bytes when a message has been written
1009      * 
1010      * @param request The written message
1011      */
1012     public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
1013         Object message = request.getMessage();
1014         
1015         if (message instanceof IoBuffer) {
1016             IoBuffer b = (IoBuffer) message;
1017             
1018             if (b.hasRemaining()) {
1019                 increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
1020             } else {
1021                 decreaseScheduledWriteMessages();
1022             }
1023         } else {
1024             decreaseScheduledWriteMessages();
1025         }
1026     }
1027 
1028     /**
1029      * {@inheritDoc}
1030      */
1031     public final WriteRequestQueue getWriteRequestQueue() {
1032         if (writeRequestQueue == null) {
1033             throw new IllegalStateException();
1034         }
1035         
1036         return writeRequestQueue;
1037     }
1038 
1039     /**
1040      * {@inheritDoc}
1041      */
1042     public final WriteRequest getCurrentWriteRequest() {
1043         return currentWriteRequest;
1044     }
1045 
1046     /**
1047      * {@inheritDoc}
1048      */
1049     public final Object getCurrentWriteMessage() {
1050         WriteRequest req = getCurrentWriteRequest();
1051         
1052         if (req == null) {
1053             return null;
1054         }
1055         return req.getMessage();
1056     }
1057 
1058     /**
1059      * {@inheritDoc}
1060      */
1061     public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
1062         this.currentWriteRequest = currentWriteRequest;
1063     }
1064 
1065     /**
1066      * Increase the ReadBuffer size (it will double)
1067      */
1068     public final void increaseReadBufferSize() {
1069         int newReadBufferSize = getConfig().getReadBufferSize() << 1;
1070         if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
1071             getConfig().setReadBufferSize(newReadBufferSize);
1072         } else {
1073             getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
1074         }
1075 
1076         deferDecreaseReadBuffer = true;
1077     }
1078 
1079     /**
1080      * Decrease the ReadBuffer size (it will be divided by a factor 2)
1081      */
1082     public final void decreaseReadBufferSize() {
1083         if (deferDecreaseReadBuffer) {
1084             deferDecreaseReadBuffer = false;
1085             return;
1086         }
1087 
1088         if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
1089             getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
1090         }
1091 
1092         deferDecreaseReadBuffer = true;
1093     }
1094 
1095     /**
1096      * {@inheritDoc}
1097      */
1098     public final long getCreationTime() {
1099         return creationTime;
1100     }
1101 
1102     /**
1103      * {@inheritDoc}
1104      */
1105     public final long getLastIoTime() {
1106         return Math.max(lastReadTime, lastWriteTime);
1107     }
1108 
1109     /**
1110      * {@inheritDoc}
1111      */
1112     public final long getLastReadTime() {
1113         return lastReadTime;
1114     }
1115 
1116     /**
1117      * {@inheritDoc}
1118      */
1119     public final long getLastWriteTime() {
1120         return lastWriteTime;
1121     }
1122 
1123     /**
1124      * {@inheritDoc}
1125      */
1126     public final boolean isIdle(IdleStatus status) {
1127         if (status == IdleStatus.BOTH_IDLE) {
1128             return idleCountForBoth.get() > 0;
1129         }
1130 
1131         if (status == IdleStatus.READER_IDLE) {
1132             return idleCountForRead.get() > 0;
1133         }
1134 
1135         if (status == IdleStatus.WRITER_IDLE) {
1136             return idleCountForWrite.get() > 0;
1137         }
1138 
1139         throw new IllegalArgumentException("Unknown idle status: " + status);
1140     }
1141 
1142     /**
1143      * {@inheritDoc}
1144      */
1145     public final boolean isBothIdle() {
1146         return isIdle(IdleStatus.BOTH_IDLE);
1147     }
1148 
1149     /**
1150      * {@inheritDoc}
1151      */
1152     public final boolean isReaderIdle() {
1153         return isIdle(IdleStatus.READER_IDLE);
1154     }
1155 
1156     /**
1157      * {@inheritDoc}
1158      */
1159     public final boolean isWriterIdle() {
1160         return isIdle(IdleStatus.WRITER_IDLE);
1161     }
1162 
1163     /**
1164      * {@inheritDoc}
1165      */
1166     public final int getIdleCount(IdleStatus status) {
1167         if (getConfig().getIdleTime(status) == 0) {
1168             if (status == IdleStatus.BOTH_IDLE) {
1169                 idleCountForBoth.set(0);
1170             }
1171 
1172             if (status == IdleStatus.READER_IDLE) {
1173                 idleCountForRead.set(0);
1174             }
1175 
1176             if (status == IdleStatus.WRITER_IDLE) {
1177                 idleCountForWrite.set(0);
1178             }
1179         }
1180 
1181         if (status == IdleStatus.BOTH_IDLE) {
1182             return idleCountForBoth.get();
1183         }
1184 
1185         if (status == IdleStatus.READER_IDLE) {
1186             return idleCountForRead.get();
1187         }
1188 
1189         if (status == IdleStatus.WRITER_IDLE) {
1190             return idleCountForWrite.get();
1191         }
1192 
1193         throw new IllegalArgumentException("Unknown idle status: " + status);
1194     }
1195 
1196     /**
1197      * {@inheritDoc}
1198      */
1199     public final long getLastIdleTime(IdleStatus status) {
1200         if (status == IdleStatus.BOTH_IDLE) {
1201             return lastIdleTimeForBoth;
1202         }
1203 
1204         if (status == IdleStatus.READER_IDLE) {
1205             return lastIdleTimeForRead;
1206         }
1207 
1208         if (status == IdleStatus.WRITER_IDLE) {
1209             return lastIdleTimeForWrite;
1210         }
1211 
1212         throw new IllegalArgumentException("Unknown idle status: " + status);
1213     }
1214 
1215     /**
1216      * Increase the count of the various Idle counter
1217      * 
1218      * @param status The current status
1219      * @param currentTime The current time
1220      */
1221     public final void increaseIdleCount(IdleStatus status, long currentTime) {
1222         if (status == IdleStatus.BOTH_IDLE) {
1223             idleCountForBoth.incrementAndGet();
1224             lastIdleTimeForBoth = currentTime;
1225         } else if (status == IdleStatus.READER_IDLE) {
1226             idleCountForRead.incrementAndGet();
1227             lastIdleTimeForRead = currentTime;
1228         } else if (status == IdleStatus.WRITER_IDLE) {
1229             idleCountForWrite.incrementAndGet();
1230             lastIdleTimeForWrite = currentTime;
1231         } else {
1232             throw new IllegalArgumentException("Unknown idle status: " + status);
1233         }
1234     }
1235 
1236     /**
1237      * {@inheritDoc}
1238      */
1239     public final int getBothIdleCount() {
1240         return getIdleCount(IdleStatus.BOTH_IDLE);
1241     }
1242 
1243     /**
1244      * {@inheritDoc}
1245      */
1246     public final long getLastBothIdleTime() {
1247         return getLastIdleTime(IdleStatus.BOTH_IDLE);
1248     }
1249 
1250     /**
1251      * {@inheritDoc}
1252      */
1253     public final long getLastReaderIdleTime() {
1254         return getLastIdleTime(IdleStatus.READER_IDLE);
1255     }
1256 
1257     /**
1258      * {@inheritDoc}
1259      */
1260     public final long getLastWriterIdleTime() {
1261         return getLastIdleTime(IdleStatus.WRITER_IDLE);
1262     }
1263 
1264     /**
1265      * {@inheritDoc}
1266      */
1267     public final int getReaderIdleCount() {
1268         return getIdleCount(IdleStatus.READER_IDLE);
1269     }
1270 
1271     /**
1272      * {@inheritDoc}
1273      */
1274     public final int getWriterIdleCount() {
1275         return getIdleCount(IdleStatus.WRITER_IDLE);
1276     }
1277 
1278     /**
1279      * {@inheritDoc}
1280      */
1281     public SocketAddress getServiceAddress() {
1282         IoService service = getService();
1283         if (service instanceof IoAcceptor) {
1284             return ((IoAcceptor) service).getLocalAddress();
1285         }
1286 
1287         return getRemoteAddress();
1288     }
1289 
1290     /**
1291      * {@inheritDoc}
1292      */
1293     @Override
1294     public final int hashCode() {
1295         return super.hashCode();
1296     }
1297 
1298     /**
1299      * {@inheritDoc} TODO This is a ridiculous implementation. Need to be
1300      * replaced.
1301      */
1302     @Override
1303     public final boolean equals(Object o) {
1304         return super.equals(o);
1305     }
1306 
1307     /**
1308      * {@inheritDoc}
1309      */
1310     @Override
1311     public String toString() {
1312         if (isConnected() || isClosing()) {
1313             String remote = null;
1314             String local = null;
1315 
1316             try {
1317                 remote = String.valueOf(getRemoteAddress());
1318             } catch (Exception e) {
1319                 remote = "Cannot get the remote address informations: " + e.getMessage();
1320             }
1321 
1322             try {
1323                 local = String.valueOf(getLocalAddress());
1324             } catch (Exception e) {
1325             }
1326 
1327             if (getService() instanceof IoAcceptor) {
1328                 return "(" + getIdAsString() + ": " + getServiceName() + ", server, " + remote + " => " + local + ')';
1329             }
1330 
1331             return "(" + getIdAsString() + ": " + getServiceName() + ", client, " + local + " => " + remote + ')';
1332         }
1333 
1334         return "(" + getIdAsString() + ") Session disconnected ...";
1335     }
1336 
1337     /**
1338      * Get the Id as a String
1339      */
1340     private String getIdAsString() {
1341         String id = Long.toHexString(getId()).toUpperCase();
1342         
1343         if (id.length() <= 8) {
1344             return "0x00000000".substring(0, 10 - id.length()) + id;
1345         } else {
1346             return "0x" + id;
1347         }
1348     }
1349 
1350     /**
1351      * TGet the Service name
1352      */
1353     private String getServiceName() {
1354         TransportMetadata tm = getTransportMetadata();
1355         if (tm == null) {
1356             return "null";
1357         }
1358 
1359         return tm.getProviderName() + ' ' + tm.getName();
1360     }
1361 
1362     /**
1363      * {@inheritDoc}
1364      */
1365     public IoService getService() {
1366         return service;
1367     }
1368 
1369     /**
1370      * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable sessions
1371      * in the specified collection.
1372      * 
1373      * @param sessions The sessions that are notified
1374      * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1375      */
1376     public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
1377         while (sessions.hasNext()) {
1378             IoSession session = sessions.next();
1379             
1380             if (!session.getCloseFuture().isClosed()) {
1381                 notifyIdleSession(session, currentTime);
1382             }
1383         }
1384     }
1385 
1386     /**
1387      * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the
1388      * specified {@code session}.
1389      * 
1390      * @param session The session that is notified
1391      * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1392      */
1393     public static void notifyIdleSession(IoSession session, long currentTime) {
1394         notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
1395                 IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
1396 
1397         notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
1398                 IdleStatus.READER_IDLE,
1399                 Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
1400 
1401         notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
1402                 IdleStatus.WRITER_IDLE,
1403                 Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
1404 
1405         notifyWriteTimeout(session, currentTime);
1406     }
1407 
1408     private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status,
1409             long lastIoTime) {
1410         if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) {
1411             session.getFilterChain().fireSessionIdle(status);
1412         }
1413     }
1414 
1415     private static void notifyWriteTimeout(IoSession session, long currentTime) {
1416 
1417         long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
1418         if ((writeTimeout > 0) && (currentTime - session.getLastWriteTime() >= writeTimeout)
1419                 && !session.getWriteRequestQueue().isEmpty(session)) {
1420             WriteRequest request = session.getCurrentWriteRequest();
1421             if (request != null) {
1422                 session.setCurrentWriteRequest(null);
1423                 WriteTimeoutException cause = new WriteTimeoutException(request);
1424                 request.getFuture().setException(cause);
1425                 session.getFilterChain().fireExceptionCaught(cause);
1426                 // WriteException is an IOException, so we close the session.
1427                 session.closeNow();
1428             }
1429         }
1430     }
1431 }