001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.mina.core.session;
021
022import java.io.File;
023import java.io.FileInputStream;
024import java.io.IOException;
025import java.net.SocketAddress;
026import java.nio.channels.FileChannel;
027import java.util.Iterator;
028import java.util.Queue;
029import java.util.Set;
030import java.util.concurrent.ConcurrentLinkedQueue;
031import java.util.concurrent.atomic.AtomicBoolean;
032import java.util.concurrent.atomic.AtomicInteger;
033import java.util.concurrent.atomic.AtomicLong;
034
035import org.apache.mina.core.buffer.IoBuffer;
036import org.apache.mina.core.file.DefaultFileRegion;
037import org.apache.mina.core.file.FilenameFileRegion;
038import org.apache.mina.core.filterchain.IoFilterChain;
039import org.apache.mina.core.future.CloseFuture;
040import org.apache.mina.core.future.DefaultCloseFuture;
041import org.apache.mina.core.future.DefaultReadFuture;
042import org.apache.mina.core.future.DefaultWriteFuture;
043import org.apache.mina.core.future.IoFutureListener;
044import org.apache.mina.core.future.ReadFuture;
045import org.apache.mina.core.future.WriteFuture;
046import org.apache.mina.core.service.AbstractIoService;
047import org.apache.mina.core.service.IoAcceptor;
048import org.apache.mina.core.service.IoHandler;
049import org.apache.mina.core.service.IoProcessor;
050import org.apache.mina.core.service.IoService;
051import org.apache.mina.core.service.TransportMetadata;
052import org.apache.mina.core.write.DefaultWriteRequest;
053import org.apache.mina.core.write.WriteException;
054import org.apache.mina.core.write.WriteRequest;
055import org.apache.mina.core.write.WriteRequestQueue;
056import org.apache.mina.core.write.WriteTimeoutException;
057import org.apache.mina.core.write.WriteToClosedSessionException;
058import org.apache.mina.util.ExceptionMonitor;
059
060/**
061 * Base implementation of {@link IoSession}.
062 * 
063 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
064 */
065public abstract class AbstractIoSession implements IoSession {
066    /** The associated handler */
067    private final IoHandler handler;
068
069    /** The session config */
070    protected IoSessionConfig config;
071
072    /** The service which will manage this session */
073    private final IoService service;
074
075    private static final AttributeKey READY_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
076            "readyReadFutures");
077
078    private static final AttributeKey WAITING_READ_FUTURES_KEY = new AttributeKey(AbstractIoSession.class,
079            "waitingReadFutures");
080
081    private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER = new IoFutureListener<CloseFuture>() {
082        public void operationComplete(CloseFuture future) {
083            AbstractIoSession session = (AbstractIoSession) future.getSession();
084            session.scheduledWriteBytes.set(0);
085            session.scheduledWriteMessages.set(0);
086            session.readBytesThroughput = 0;
087            session.readMessagesThroughput = 0;
088            session.writtenBytesThroughput = 0;
089            session.writtenMessagesThroughput = 0;
090        }
091    };
092
093    /**
094     * An internal write request object that triggers session close.
095     * 
096     * @see #writeRequestQueue
097     */
098    public static final WriteRequest CLOSE_REQUEST = new DefaultWriteRequest(new Object());
099
100    /**
101     * An internal write request object that triggers message sent events.
102     * 
103     * @see #writeRequestQueue
104     */
105    public static final WriteRequest MESSAGE_SENT_REQUEST = new DefaultWriteRequest(DefaultWriteRequest.EMPTY_MESSAGE);
106
107    private final Object lock = new Object();
108
109    private IoSessionAttributeMap attributes;
110
111    private WriteRequestQueue writeRequestQueue;
112
113    private WriteRequest currentWriteRequest;
114
115    /** The Session creation's time */
116    private final long creationTime;
117
118    /** An id generator guaranteed to generate unique IDs for the session */
119    private static AtomicLong idGenerator = new AtomicLong(0);
120
121    /** The session ID */
122    private long sessionId;
123
124    /**
125     * A future that will be set 'closed' when the connection is closed.
126     */
127    private final CloseFuture closeFuture = new DefaultCloseFuture(this);
128
129    private volatile boolean closing;
130
131    // traffic control
132    private boolean readSuspended = false;
133
134    private boolean writeSuspended = false;
135
136    // Status variables
137    private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
138
139    private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
140
141    private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
142
143    private long readBytes;
144
145    private long writtenBytes;
146
147    private long readMessages;
148
149    private long writtenMessages;
150
151    private long lastReadTime;
152
153    private long lastWriteTime;
154
155    private long lastThroughputCalculationTime;
156
157    private long lastReadBytes;
158
159    private long lastWrittenBytes;
160
161    private long lastReadMessages;
162
163    private long lastWrittenMessages;
164
165    private double readBytesThroughput;
166
167    private double writtenBytesThroughput;
168
169    private double readMessagesThroughput;
170
171    private double writtenMessagesThroughput;
172
173    private AtomicInteger idleCountForBoth = new AtomicInteger();
174
175    private AtomicInteger idleCountForRead = new AtomicInteger();
176
177    private AtomicInteger idleCountForWrite = new AtomicInteger();
178
179    private long lastIdleTimeForBoth;
180
181    private long lastIdleTimeForRead;
182
183    private long lastIdleTimeForWrite;
184
185    private boolean deferDecreaseReadBuffer = true;
186
187    /**
188     * Create a Session for a service
189     * 
190     * @param service the Service for this session
191     */
192    protected AbstractIoSession(IoService service) {
193        this.service = service;
194        this.handler = service.getHandler();
195
196        // Initialize all the Session counters to the current time
197        long currentTime = System.currentTimeMillis();
198        creationTime = currentTime;
199        lastThroughputCalculationTime = currentTime;
200        lastReadTime = currentTime;
201        lastWriteTime = currentTime;
202        lastIdleTimeForBoth = currentTime;
203        lastIdleTimeForRead = currentTime;
204        lastIdleTimeForWrite = currentTime;
205
206        // TODO add documentation
207        closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
208
209        // Set a new ID for this session
210        sessionId = idGenerator.incrementAndGet();
211    }
212
213    /**
214     * {@inheritDoc}
215     * 
216     * We use an AtomicLong to guarantee that the session ID are unique.
217     */
218    public final long getId() {
219        return sessionId;
220    }
221
222    /**
223     * @return The associated IoProcessor for this session
224     */
225    public abstract IoProcessor getProcessor();
226
227    /**
228     * {@inheritDoc}
229     */
230    public final boolean isConnected() {
231        return !closeFuture.isClosed();
232    }
233
234    /**
235     * {@inheritDoc}
236     */
237    public boolean isActive() {
238        // Return true by default
239        return true;
240    }
241
242    /**
243     * {@inheritDoc}
244     */
245    public final boolean isClosing() {
246        return closing || closeFuture.isClosed();
247    }
248
249    /**
250     * {@inheritDoc}
251     */
252    public boolean isSecured() {
253        // Always false...
254        return false;
255    }
256
257    /**
258     * {@inheritDoc}
259     */
260    public final CloseFuture getCloseFuture() {
261        return closeFuture;
262    }
263
264    /**
265     * Tells if the session is scheduled for flushed
266     * 
267     * @return true if the session is scheduled for flush
268     */
269    public final boolean isScheduledForFlush() {
270        return scheduledForFlush.get();
271    }
272
273    /**
274     * Schedule the session for flushed
275     */
276    public final void scheduledForFlush() {
277        scheduledForFlush.set(true);
278    }
279
280    /**
281     * Change the session's status : it's not anymore scheduled for flush
282     */
283    public final void unscheduledForFlush() {
284        scheduledForFlush.set(false);
285    }
286
287    /**
288     * Set the scheduledForFLush flag. As we may have concurrent access to this
289     * flag, we compare and set it in one call.
290     * 
291     * @param schedule
292     *            the new value to set if not already set.
293     * @return true if the session flag has been set, and if it wasn't set
294     *         already.
295     */
296    public final boolean setScheduledForFlush(boolean schedule) {
297        if (schedule) {
298            // If the current tag is set to false, switch it to true,
299            // otherwise, we do nothing but return false : the session
300            // is already scheduled for flush
301            return scheduledForFlush.compareAndSet(false, schedule);
302        }
303
304        scheduledForFlush.set(schedule);
305        return true;
306    }
307
308    /**
309     * {@inheritDoc}
310     */
311    public final CloseFuture close(boolean rightNow) {
312        if (rightNow) {
313            return closeNow();
314        } else {
315            return closeOnFlush();
316        }
317    }
318
319    /**
320     * {@inheritDoc}
321     */
322    public final CloseFuture close() {
323        return closeNow();
324    }
325
326    /**
327     * {@inheritDoc}
328     */
329    public final CloseFuture closeOnFlush() {
330        if (!isClosing()) {
331            getWriteRequestQueue().offer(this, CLOSE_REQUEST);
332            getProcessor().flush(this);
333        }
334        
335        return closeFuture;
336    }
337
338    /**
339     * {@inheritDoc}
340     */
341    public final CloseFuture closeNow() {
342        synchronized (lock) {
343            if (isClosing()) {
344                return closeFuture;
345            }
346
347            closing = true;
348            
349            try {
350                destroy();
351            } catch (Exception e) {
352                IoFilterChain filterChain = getFilterChain();
353                filterChain.fireExceptionCaught(e);
354            }
355        }
356
357        getFilterChain().fireFilterClose();
358
359        return closeFuture;
360    }
361    
362    /**
363     * Destroy the session
364     */
365    protected void destroy() {
366        if (writeRequestQueue != null) {
367            while (!writeRequestQueue.isEmpty(this)) {
368                WriteRequest writeRequest = writeRequestQueue.poll(this);
369                
370                if (writeRequest != null) {
371                    WriteFuture writeFuture = writeRequest.getFuture();
372                    
373                    // The WriteRequest may not always have a future : The CLOSE_REQUEST
374                    // and MESSAGE_SENT_REQUEST don't.
375                    if (writeFuture != null) {
376                        writeFuture.setWritten();
377                    }
378                }
379            }
380        }
381    }
382
383    /**
384     * {@inheritDoc}
385     */
386    public IoHandler getHandler() {
387        return handler;
388    }
389
390    /**
391     * {@inheritDoc}
392     */
393    public IoSessionConfig getConfig() {
394        return config;
395    }
396
397    /**
398     * {@inheritDoc}
399     */
400    public final ReadFuture read() {
401        if (!getConfig().isUseReadOperation()) {
402            throw new IllegalStateException("useReadOperation is not enabled.");
403        }
404
405        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
406        ReadFuture future;
407        
408        synchronized (readyReadFutures) {
409            future = readyReadFutures.poll();
410            
411            if (future != null) {
412                if (future.isClosed()) {
413                    // Let other readers get notified.
414                    readyReadFutures.offer(future);
415                }
416            } else {
417                future = new DefaultReadFuture(this);
418                getWaitingReadFutures().offer(future);
419            }
420        }
421
422        return future;
423    }
424
425    /**
426     * Associates a message to a ReadFuture
427     * 
428     * @param message the message to associate to the ReadFuture
429     * 
430     */
431    public final void offerReadFuture(Object message) {
432        newReadFuture().setRead(message);
433    }
434
435    /**
436     * Associates a failure to a ReadFuture
437     * 
438     * @param exception the exception to associate to the ReadFuture
439     */
440    public final void offerFailedReadFuture(Throwable exception) {
441        newReadFuture().setException(exception);
442    }
443
444    /**
445     * Inform the ReadFuture that the session has been closed
446     */
447    public final void offerClosedReadFuture() {
448        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
449        
450        synchronized (readyReadFutures) {
451            newReadFuture().setClosed();
452        }
453    }
454
455    /**
456     * @return a readFuture get from the waiting ReadFuture
457     */
458    private ReadFuture newReadFuture() {
459        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
460        Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
461        ReadFuture future;
462        
463        synchronized (readyReadFutures) {
464            future = waitingReadFutures.poll();
465            
466            if (future == null) {
467                future = new DefaultReadFuture(this);
468                readyReadFutures.offer(future);
469            }
470        }
471        
472        return future;
473    }
474
475    /**
476     * @return a queue of ReadFuture
477     */
478    private Queue<ReadFuture> getReadyReadFutures() {
479        Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
480        
481        if (readyReadFutures == null) {
482            readyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
483
484            Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY,
485                    readyReadFutures);
486            
487            if (oldReadyReadFutures != null) {
488                readyReadFutures = oldReadyReadFutures;
489            }
490        }
491        
492        return readyReadFutures;
493    }
494
495    /**
496     * @return the queue of waiting ReadFuture
497     */
498    private Queue<ReadFuture> getWaitingReadFutures() {
499        Queue<ReadFuture> waitingReadyReadFutures = (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
500        
501        if (waitingReadyReadFutures == null) {
502            waitingReadyReadFutures = new ConcurrentLinkedQueue<ReadFuture>();
503
504            Queue<ReadFuture> oldWaitingReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(
505                    WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
506            
507            if (oldWaitingReadyReadFutures != null) {
508                waitingReadyReadFutures = oldWaitingReadyReadFutures;
509            }
510        }
511        
512        return waitingReadyReadFutures;
513    }
514
515    /**
516     * {@inheritDoc}
517     */
518    public WriteFuture write(Object message) {
519        return write(message, null);
520    }
521
522    /**
523     * {@inheritDoc}
524     */
525    public WriteFuture write(Object message, SocketAddress remoteAddress) {
526        if (message == null) {
527            throw new IllegalArgumentException("Trying to write a null message : not allowed");
528        }
529
530        // We can't send a message to a connected session if we don't have
531        // the remote address
532        if (!getTransportMetadata().isConnectionless() && (remoteAddress != null)) {
533            throw new UnsupportedOperationException();
534        }
535
536        // If the session has been closed or is closing, we can't either
537        // send a message to the remote side. We generate a future
538        // containing an exception.
539        if (isClosing() || !isConnected()) {
540            WriteFuture future = new DefaultWriteFuture(this);
541            WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
542            WriteException writeException = new WriteToClosedSessionException(request);
543            future.setException(writeException);
544            return future;
545        }
546
547        FileChannel openedFileChannel = null;
548
549        // TODO: remove this code as soon as we use InputStream
550        // instead of Object for the message.
551        try {
552            if ((message instanceof IoBuffer) && !((IoBuffer) message).hasRemaining()) {
553                // Nothing to write : probably an error in the user code
554                throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
555            } else if (message instanceof FileChannel) {
556                FileChannel fileChannel = (FileChannel) message;
557                message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
558            } else if (message instanceof File) {
559                File file = (File) message;
560                openedFileChannel = new FileInputStream(file).getChannel();
561                message = new FilenameFileRegion(file, openedFileChannel, 0, openedFileChannel.size());
562            }
563        } catch (IOException e) {
564            ExceptionMonitor.getInstance().exceptionCaught(e);
565            return DefaultWriteFuture.newNotWrittenFuture(this, e);
566        }
567
568        // Now, we can write the message. First, create a future
569        WriteFuture writeFuture = new DefaultWriteFuture(this);
570        WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
571
572        // Then, get the chain and inject the WriteRequest into it
573        IoFilterChain filterChain = getFilterChain();
574        filterChain.fireFilterWrite(writeRequest);
575
576        // TODO : This is not our business ! The caller has created a
577        // FileChannel,
578        // he has to close it !
579        if (openedFileChannel != null) {
580            // If we opened a FileChannel, it needs to be closed when the write
581            // has completed
582            final FileChannel finalChannel = openedFileChannel;
583            writeFuture.addListener(new IoFutureListener<WriteFuture>() {
584                public void operationComplete(WriteFuture future) {
585                    try {
586                        finalChannel.close();
587                    } catch (IOException e) {
588                        ExceptionMonitor.getInstance().exceptionCaught(e);
589                    }
590                }
591            });
592        }
593
594        // Return the WriteFuture.
595        return writeFuture;
596    }
597
598    /**
599     * {@inheritDoc}
600     */
601    public final Object getAttachment() {
602        return getAttribute("");
603    }
604
605    /**
606     * {@inheritDoc}
607     */
608    public final Object setAttachment(Object attachment) {
609        return setAttribute("", attachment);
610    }
611
612    /**
613     * {@inheritDoc}
614     */
615    public final Object getAttribute(Object key) {
616        return getAttribute(key, null);
617    }
618
619    /**
620     * {@inheritDoc}
621     */
622    public final Object getAttribute(Object key, Object defaultValue) {
623        return attributes.getAttribute(this, key, defaultValue);
624    }
625
626    /**
627     * {@inheritDoc}
628     */
629    public final Object setAttribute(Object key, Object value) {
630        return attributes.setAttribute(this, key, value);
631    }
632
633    /**
634     * {@inheritDoc}
635     */
636    public final Object setAttribute(Object key) {
637        return setAttribute(key, Boolean.TRUE);
638    }
639
640    /**
641     * {@inheritDoc}
642     */
643    public final Object setAttributeIfAbsent(Object key, Object value) {
644        return attributes.setAttributeIfAbsent(this, key, value);
645    }
646
647    /**
648     * {@inheritDoc}
649     */
650    public final Object setAttributeIfAbsent(Object key) {
651        return setAttributeIfAbsent(key, Boolean.TRUE);
652    }
653
654    /**
655     * {@inheritDoc}
656     */
657    public final Object removeAttribute(Object key) {
658        return attributes.removeAttribute(this, key);
659    }
660
661    /**
662     * {@inheritDoc}
663     */
664    public final boolean removeAttribute(Object key, Object value) {
665        return attributes.removeAttribute(this, key, value);
666    }
667
668    /**
669     * {@inheritDoc}
670     */
671    public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
672        return attributes.replaceAttribute(this, key, oldValue, newValue);
673    }
674
675    /**
676     * {@inheritDoc}
677     */
678    public final boolean containsAttribute(Object key) {
679        return attributes.containsAttribute(this, key);
680    }
681
682    /**
683     * {@inheritDoc}
684     */
685    public final Set<Object> getAttributeKeys() {
686        return attributes.getAttributeKeys(this);
687    }
688
689    /**
690     * @return The map of attributes associated with the session
691     */
692    public final IoSessionAttributeMap getAttributeMap() {
693        return attributes;
694    }
695
696    /**
697     * Set the map of attributes associated with the session
698     * 
699     * @param attributes The Map of attributes
700     */
701    public final void setAttributeMap(IoSessionAttributeMap attributes) {
702        this.attributes = attributes;
703    }
704
705    /**
706     * Create a new close aware write queue, based on the given write queue.
707     * 
708     * @param writeRequestQueue The write request queue
709     */
710    public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
711        this.writeRequestQueue = writeRequestQueue;
712    }
713
714    /**
715     * {@inheritDoc}
716     */
717    public final void suspendRead() {
718        readSuspended = true;
719        if (isClosing() || !isConnected()) {
720            return;
721        }
722        getProcessor().updateTrafficControl(this);
723    }
724
725    /**
726     * {@inheritDoc}
727     */
728    public final void suspendWrite() {
729        writeSuspended = true;
730        if (isClosing() || !isConnected()) {
731            return;
732        }
733        getProcessor().updateTrafficControl(this);
734    }
735
736    /**
737     * {@inheritDoc}
738     */
739    @SuppressWarnings("unchecked")
740    public final void resumeRead() {
741        readSuspended = false;
742        if (isClosing() || !isConnected()) {
743            return;
744        }
745        getProcessor().updateTrafficControl(this);
746    }
747
748    /**
749     * {@inheritDoc}
750     */
751    @SuppressWarnings("unchecked")
752    public final void resumeWrite() {
753        writeSuspended = false;
754        if (isClosing() || !isConnected()) {
755            return;
756        }
757        getProcessor().updateTrafficControl(this);
758    }
759
760    /**
761     * {@inheritDoc}
762     */
763    public boolean isReadSuspended() {
764        return readSuspended;
765    }
766
767    /**
768     * {@inheritDoc}
769     */
770    public boolean isWriteSuspended() {
771        return writeSuspended;
772    }
773
774    /**
775     * {@inheritDoc}
776     */
777    public final long getReadBytes() {
778        return readBytes;
779    }
780
781    /**
782     * {@inheritDoc}
783     */
784    public final long getWrittenBytes() {
785        return writtenBytes;
786    }
787
788    /**
789     * {@inheritDoc}
790     */
791    public final long getReadMessages() {
792        return readMessages;
793    }
794
795    /**
796     * {@inheritDoc}
797     */
798    public final long getWrittenMessages() {
799        return writtenMessages;
800    }
801
802    /**
803     * {@inheritDoc}
804     */
805    public final double getReadBytesThroughput() {
806        return readBytesThroughput;
807    }
808
809    /**
810     * {@inheritDoc}
811     */
812    public final double getWrittenBytesThroughput() {
813        return writtenBytesThroughput;
814    }
815
816    /**
817     * {@inheritDoc}
818     */
819    public final double getReadMessagesThroughput() {
820        return readMessagesThroughput;
821    }
822
823    /**
824     * {@inheritDoc}
825     */
826    public final double getWrittenMessagesThroughput() {
827        return writtenMessagesThroughput;
828    }
829
830    /**
831     * {@inheritDoc}
832     */
833    public final void updateThroughput(long currentTime, boolean force) {
834        int interval = (int) (currentTime - lastThroughputCalculationTime);
835
836        long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
837
838        if (((minInterval == 0) || (interval < minInterval)) && !force) {
839            return;
840        }
841
842        readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
843        writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
844        readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
845        writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
846
847        lastReadBytes = readBytes;
848        lastWrittenBytes = writtenBytes;
849        lastReadMessages = readMessages;
850        lastWrittenMessages = writtenMessages;
851
852        lastThroughputCalculationTime = currentTime;
853    }
854
855    /**
856     * {@inheritDoc}
857     */
858    public final long getScheduledWriteBytes() {
859        return scheduledWriteBytes.get();
860    }
861
862    /**
863     * {@inheritDoc}
864     */
865    public final int getScheduledWriteMessages() {
866        return scheduledWriteMessages.get();
867    }
868
869    /**
870     * Set the number of scheduled write bytes
871     * 
872     * @param byteCount The number of scheduled bytes for write
873     */
874    protected void setScheduledWriteBytes(int byteCount) {
875        scheduledWriteBytes.set(byteCount);
876    }
877
878    /**
879     * Set the number of scheduled write messages
880     * 
881     * @param messages The number of scheduled messages for write
882     */
883    protected void setScheduledWriteMessages(int messages) {
884        scheduledWriteMessages.set(messages);
885    }
886
887    /**
888     * Increase the number of read bytes
889     * 
890     * @param increment The number of read bytes
891     * @param currentTime The current time
892     */
893    public final void increaseReadBytes(long increment, long currentTime) {
894        if (increment <= 0) {
895            return;
896        }
897
898        readBytes += increment;
899        lastReadTime = currentTime;
900        idleCountForBoth.set(0);
901        idleCountForRead.set(0);
902
903        if (getService() instanceof AbstractIoService) {
904            ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
905        }
906    }
907
908    /**
909     * Increase the number of read messages
910     * 
911     * @param currentTime The current time
912     */
913    public final void increaseReadMessages(long currentTime) {
914        readMessages++;
915        lastReadTime = currentTime;
916        idleCountForBoth.set(0);
917        idleCountForRead.set(0);
918
919        if (getService() instanceof AbstractIoService) {
920            ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
921        }
922    }
923
924    /**
925     * Increase the number of written bytes
926     * 
927     * @param increment The number of written bytes
928     * @param currentTime The current time
929     */
930    public final void increaseWrittenBytes(int increment, long currentTime) {
931        if (increment <= 0) {
932            return;
933        }
934
935        writtenBytes += increment;
936        lastWriteTime = currentTime;
937        idleCountForBoth.set(0);
938        idleCountForWrite.set(0);
939
940        if (getService() instanceof AbstractIoService) {
941            ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
942        }
943
944        increaseScheduledWriteBytes(-increment);
945    }
946
947    /**
948     * Increase the number of written messages
949     * 
950     * @param request The written message
951     * @param currentTime The current tile
952     */
953    public final void increaseWrittenMessages(WriteRequest request, long currentTime) {
954        Object message = request.getMessage();
955
956        if (message instanceof IoBuffer) {
957            IoBuffer b = (IoBuffer) message;
958
959            if (b.hasRemaining()) {
960                return;
961            }
962        }
963
964        writtenMessages++;
965        lastWriteTime = currentTime;
966
967        if (getService() instanceof AbstractIoService) {
968            ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
969        }
970
971        decreaseScheduledWriteMessages();
972    }
973
974    /**
975     * Increase the number of scheduled write bytes for the session
976     * 
977     * @param increment The number of newly added bytes to write
978     */
979    public final void increaseScheduledWriteBytes(int increment) {
980        scheduledWriteBytes.addAndGet(increment);
981        if (getService() instanceof AbstractIoService) {
982            ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
983        }
984    }
985
986    /**
987     * Increase the number of scheduled message to write
988     */
989    public final void increaseScheduledWriteMessages() {
990        scheduledWriteMessages.incrementAndGet();
991        
992        if (getService() instanceof AbstractIoService) {
993            ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
994        }
995    }
996
997    /**
998     * Decrease the number of scheduled message written
999     */
1000    private void decreaseScheduledWriteMessages() {
1001        scheduledWriteMessages.decrementAndGet();
1002        if (getService() instanceof AbstractIoService) {
1003            ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
1004        }
1005    }
1006
1007    /**
1008     * Decrease the counters of written messages and written bytes when a message has been written
1009     * 
1010     * @param request The written message
1011     */
1012    public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
1013        Object message = request.getMessage();
1014        
1015        if (message instanceof IoBuffer) {
1016            IoBuffer b = (IoBuffer) message;
1017            
1018            if (b.hasRemaining()) {
1019                increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
1020            } else {
1021                decreaseScheduledWriteMessages();
1022            }
1023        } else {
1024            decreaseScheduledWriteMessages();
1025        }
1026    }
1027
1028    /**
1029     * {@inheritDoc}
1030     */
1031    public final WriteRequestQueue getWriteRequestQueue() {
1032        if (writeRequestQueue == null) {
1033            throw new IllegalStateException();
1034        }
1035        
1036        return writeRequestQueue;
1037    }
1038
1039    /**
1040     * {@inheritDoc}
1041     */
1042    public final WriteRequest getCurrentWriteRequest() {
1043        return currentWriteRequest;
1044    }
1045
1046    /**
1047     * {@inheritDoc}
1048     */
1049    public final Object getCurrentWriteMessage() {
1050        WriteRequest req = getCurrentWriteRequest();
1051        
1052        if (req == null) {
1053            return null;
1054        }
1055        return req.getMessage();
1056    }
1057
1058    /**
1059     * {@inheritDoc}
1060     */
1061    public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
1062        this.currentWriteRequest = currentWriteRequest;
1063    }
1064
1065    /**
1066     * Increase the ReadBuffer size (it will double)
1067     */
1068    public final void increaseReadBufferSize() {
1069        int newReadBufferSize = getConfig().getReadBufferSize() << 1;
1070        if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
1071            getConfig().setReadBufferSize(newReadBufferSize);
1072        } else {
1073            getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
1074        }
1075
1076        deferDecreaseReadBuffer = true;
1077    }
1078
1079    /**
1080     * Decrease the ReadBuffer size (it will be divided by a factor 2)
1081     */
1082    public final void decreaseReadBufferSize() {
1083        if (deferDecreaseReadBuffer) {
1084            deferDecreaseReadBuffer = false;
1085            return;
1086        }
1087
1088        if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
1089            getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
1090        }
1091
1092        deferDecreaseReadBuffer = true;
1093    }
1094
1095    /**
1096     * {@inheritDoc}
1097     */
1098    public final long getCreationTime() {
1099        return creationTime;
1100    }
1101
1102    /**
1103     * {@inheritDoc}
1104     */
1105    public final long getLastIoTime() {
1106        return Math.max(lastReadTime, lastWriteTime);
1107    }
1108
1109    /**
1110     * {@inheritDoc}
1111     */
1112    public final long getLastReadTime() {
1113        return lastReadTime;
1114    }
1115
1116    /**
1117     * {@inheritDoc}
1118     */
1119    public final long getLastWriteTime() {
1120        return lastWriteTime;
1121    }
1122
1123    /**
1124     * {@inheritDoc}
1125     */
1126    public final boolean isIdle(IdleStatus status) {
1127        if (status == IdleStatus.BOTH_IDLE) {
1128            return idleCountForBoth.get() > 0;
1129        }
1130
1131        if (status == IdleStatus.READER_IDLE) {
1132            return idleCountForRead.get() > 0;
1133        }
1134
1135        if (status == IdleStatus.WRITER_IDLE) {
1136            return idleCountForWrite.get() > 0;
1137        }
1138
1139        throw new IllegalArgumentException("Unknown idle status: " + status);
1140    }
1141
1142    /**
1143     * {@inheritDoc}
1144     */
1145    public final boolean isBothIdle() {
1146        return isIdle(IdleStatus.BOTH_IDLE);
1147    }
1148
1149    /**
1150     * {@inheritDoc}
1151     */
1152    public final boolean isReaderIdle() {
1153        return isIdle(IdleStatus.READER_IDLE);
1154    }
1155
1156    /**
1157     * {@inheritDoc}
1158     */
1159    public final boolean isWriterIdle() {
1160        return isIdle(IdleStatus.WRITER_IDLE);
1161    }
1162
1163    /**
1164     * {@inheritDoc}
1165     */
1166    public final int getIdleCount(IdleStatus status) {
1167        if (getConfig().getIdleTime(status) == 0) {
1168            if (status == IdleStatus.BOTH_IDLE) {
1169                idleCountForBoth.set(0);
1170            }
1171
1172            if (status == IdleStatus.READER_IDLE) {
1173                idleCountForRead.set(0);
1174            }
1175
1176            if (status == IdleStatus.WRITER_IDLE) {
1177                idleCountForWrite.set(0);
1178            }
1179        }
1180
1181        if (status == IdleStatus.BOTH_IDLE) {
1182            return idleCountForBoth.get();
1183        }
1184
1185        if (status == IdleStatus.READER_IDLE) {
1186            return idleCountForRead.get();
1187        }
1188
1189        if (status == IdleStatus.WRITER_IDLE) {
1190            return idleCountForWrite.get();
1191        }
1192
1193        throw new IllegalArgumentException("Unknown idle status: " + status);
1194    }
1195
1196    /**
1197     * {@inheritDoc}
1198     */
1199    public final long getLastIdleTime(IdleStatus status) {
1200        if (status == IdleStatus.BOTH_IDLE) {
1201            return lastIdleTimeForBoth;
1202        }
1203
1204        if (status == IdleStatus.READER_IDLE) {
1205            return lastIdleTimeForRead;
1206        }
1207
1208        if (status == IdleStatus.WRITER_IDLE) {
1209            return lastIdleTimeForWrite;
1210        }
1211
1212        throw new IllegalArgumentException("Unknown idle status: " + status);
1213    }
1214
1215    /**
1216     * Increase the count of the various Idle counter
1217     * 
1218     * @param status The current status
1219     * @param currentTime The current time
1220     */
1221    public final void increaseIdleCount(IdleStatus status, long currentTime) {
1222        if (status == IdleStatus.BOTH_IDLE) {
1223            idleCountForBoth.incrementAndGet();
1224            lastIdleTimeForBoth = currentTime;
1225        } else if (status == IdleStatus.READER_IDLE) {
1226            idleCountForRead.incrementAndGet();
1227            lastIdleTimeForRead = currentTime;
1228        } else if (status == IdleStatus.WRITER_IDLE) {
1229            idleCountForWrite.incrementAndGet();
1230            lastIdleTimeForWrite = currentTime;
1231        } else {
1232            throw new IllegalArgumentException("Unknown idle status: " + status);
1233        }
1234    }
1235
1236    /**
1237     * {@inheritDoc}
1238     */
1239    public final int getBothIdleCount() {
1240        return getIdleCount(IdleStatus.BOTH_IDLE);
1241    }
1242
1243    /**
1244     * {@inheritDoc}
1245     */
1246    public final long getLastBothIdleTime() {
1247        return getLastIdleTime(IdleStatus.BOTH_IDLE);
1248    }
1249
1250    /**
1251     * {@inheritDoc}
1252     */
1253    public final long getLastReaderIdleTime() {
1254        return getLastIdleTime(IdleStatus.READER_IDLE);
1255    }
1256
1257    /**
1258     * {@inheritDoc}
1259     */
1260    public final long getLastWriterIdleTime() {
1261        return getLastIdleTime(IdleStatus.WRITER_IDLE);
1262    }
1263
1264    /**
1265     * {@inheritDoc}
1266     */
1267    public final int getReaderIdleCount() {
1268        return getIdleCount(IdleStatus.READER_IDLE);
1269    }
1270
1271    /**
1272     * {@inheritDoc}
1273     */
1274    public final int getWriterIdleCount() {
1275        return getIdleCount(IdleStatus.WRITER_IDLE);
1276    }
1277
1278    /**
1279     * {@inheritDoc}
1280     */
1281    public SocketAddress getServiceAddress() {
1282        IoService service = getService();
1283        if (service instanceof IoAcceptor) {
1284            return ((IoAcceptor) service).getLocalAddress();
1285        }
1286
1287        return getRemoteAddress();
1288    }
1289
1290    /**
1291     * {@inheritDoc}
1292     */
1293    @Override
1294    public final int hashCode() {
1295        return super.hashCode();
1296    }
1297
1298    /**
1299     * {@inheritDoc} TODO This is a ridiculous implementation. Need to be
1300     * replaced.
1301     */
1302    @Override
1303    public final boolean equals(Object o) {
1304        return super.equals(o);
1305    }
1306
1307    /**
1308     * {@inheritDoc}
1309     */
1310    @Override
1311    public String toString() {
1312        if (isConnected() || isClosing()) {
1313            String remote = null;
1314            String local = null;
1315
1316            try {
1317                remote = String.valueOf(getRemoteAddress());
1318            } catch (Exception e) {
1319                remote = "Cannot get the remote address informations: " + e.getMessage();
1320            }
1321
1322            try {
1323                local = String.valueOf(getLocalAddress());
1324            } catch (Exception e) {
1325            }
1326
1327            if (getService() instanceof IoAcceptor) {
1328                return "(" + getIdAsString() + ": " + getServiceName() + ", server, " + remote + " => " + local + ')';
1329            }
1330
1331            return "(" + getIdAsString() + ": " + getServiceName() + ", client, " + local + " => " + remote + ')';
1332        }
1333
1334        return "(" + getIdAsString() + ") Session disconnected ...";
1335    }
1336
1337    /**
1338     * Get the Id as a String
1339     */
1340    private String getIdAsString() {
1341        String id = Long.toHexString(getId()).toUpperCase();
1342        
1343        if (id.length() <= 8) {
1344            return "0x00000000".substring(0, 10 - id.length()) + id;
1345        } else {
1346            return "0x" + id;
1347        }
1348    }
1349
1350    /**
1351     * TGet the Service name
1352     */
1353    private String getServiceName() {
1354        TransportMetadata tm = getTransportMetadata();
1355        if (tm == null) {
1356            return "null";
1357        }
1358
1359        return tm.getProviderName() + ' ' + tm.getName();
1360    }
1361
1362    /**
1363     * {@inheritDoc}
1364     */
1365    public IoService getService() {
1366        return service;
1367    }
1368
1369    /**
1370     * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable sessions
1371     * in the specified collection.
1372     * 
1373     * @param sessions The sessions that are notified
1374     * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1375     */
1376    public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
1377        while (sessions.hasNext()) {
1378            IoSession session = sessions.next();
1379            
1380            if (!session.getCloseFuture().isClosed()) {
1381                notifyIdleSession(session, currentTime);
1382            }
1383        }
1384    }
1385
1386    /**
1387     * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the
1388     * specified {@code session}.
1389     * 
1390     * @param session The session that is notified
1391     * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1392     */
1393    public static void notifyIdleSession(IoSession session, long currentTime) {
1394        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
1395                IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
1396
1397        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
1398                IdleStatus.READER_IDLE,
1399                Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
1400
1401        notifyIdleSession0(session, currentTime, session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
1402                IdleStatus.WRITER_IDLE,
1403                Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
1404
1405        notifyWriteTimeout(session, currentTime);
1406    }
1407
1408    private static void notifyIdleSession0(IoSession session, long currentTime, long idleTime, IdleStatus status,
1409            long lastIoTime) {
1410        if ((idleTime > 0) && (lastIoTime != 0) && (currentTime - lastIoTime >= idleTime)) {
1411            session.getFilterChain().fireSessionIdle(status);
1412        }
1413    }
1414
1415    private static void notifyWriteTimeout(IoSession session, long currentTime) {
1416
1417        long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
1418        if ((writeTimeout > 0) && (currentTime - session.getLastWriteTime() >= writeTimeout)
1419                && !session.getWriteRequestQueue().isEmpty(session)) {
1420            WriteRequest request = session.getCurrentWriteRequest();
1421            if (request != null) {
1422                session.setCurrentWriteRequest(null);
1423                WriteTimeoutException cause = new WriteTimeoutException(request);
1424                request.getFuture().setException(cause);
1425                session.getFilterChain().fireExceptionCaught(cause);
1426                // WriteException is an IOException, so we close the session.
1427                session.closeNow();
1428            }
1429        }
1430    }
1431}