View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.session;
21  
22  import java.io.File;
23  import java.io.FileInputStream;
24  import java.io.IOException;
25  import java.net.SocketAddress;
26  import java.nio.channels.FileChannel;
27  import java.util.Iterator;
28  import java.util.Queue;
29  import java.util.Set;
30  import java.util.concurrent.atomic.AtomicBoolean;
31  import java.util.concurrent.atomic.AtomicInteger;
32  import java.util.concurrent.atomic.AtomicLong;
33  
34  import org.apache.mina.core.buffer.IoBuffer;
35  import org.apache.mina.core.file.DefaultFileRegion;
36  import org.apache.mina.core.filterchain.IoFilterChain;
37  import org.apache.mina.core.future.CloseFuture;
38  import org.apache.mina.core.future.DefaultCloseFuture;
39  import org.apache.mina.core.future.DefaultReadFuture;
40  import org.apache.mina.core.future.DefaultWriteFuture;
41  import org.apache.mina.core.future.IoFutureListener;
42  import org.apache.mina.core.future.ReadFuture;
43  import org.apache.mina.core.future.WriteFuture;
44  import org.apache.mina.core.service.AbstractIoService;
45  import org.apache.mina.core.service.IoAcceptor;
46  import org.apache.mina.core.service.IoProcessor;
47  import org.apache.mina.core.service.IoService;
48  import org.apache.mina.core.service.TransportMetadata;
49  import org.apache.mina.core.write.DefaultWriteRequest;
50  import org.apache.mina.core.write.WriteException;
51  import org.apache.mina.core.write.WriteRequest;
52  import org.apache.mina.core.write.WriteRequestQueue;
53  import org.apache.mina.core.write.WriteTimeoutException;
54  import org.apache.mina.core.write.WriteToClosedSessionException;
55  import org.apache.mina.util.CircularQueue;
56  import org.apache.mina.util.ExceptionMonitor;
57  
58  
59  /**
60   * Base implementation of {@link IoSession}.
61   *
62   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
63   */
64  public abstract class AbstractIoSession implements IoSession {
65  
66      private static final AttributeKey READY_READ_FUTURES_KEY =
67          new AttributeKey(AbstractIoSession.class, "readyReadFutures");
68      
69      private static final AttributeKey WAITING_READ_FUTURES_KEY =
70          new AttributeKey(AbstractIoSession.class, "waitingReadFutures");
71  
72      private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER =
73          new IoFutureListener<CloseFuture>() {
74              public void operationComplete(CloseFuture future) {
75                  AbstractIoSession session = (AbstractIoSession) future.getSession();
76                  session.scheduledWriteBytes.set(0);
77                  session.scheduledWriteMessages.set(0);
78                  session.readBytesThroughput = 0;
79                  session.readMessagesThroughput = 0;
80                  session.writtenBytesThroughput = 0;
81                  session.writtenMessagesThroughput = 0;
82              }
83      };
84  
85      /**
86       * An internal write request object that triggers session close.
87       * @see #writeRequestQueue
88       */
89      private static final WriteRequest CLOSE_REQUEST =
90          new DefaultWriteRequest(new Object());
91  
92      private final Object lock = new Object();
93  
94      private IoSessionAttributeMap attributes;
95      private WriteRequestQueue writeRequestQueue;
96      private WriteRequest currentWriteRequest;
97      
98      // The Session creation's time */
99      private final long creationTime;
100 
101     /** An id generator guaranteed to generate unique IDs for the session */
102     private static AtomicLong idGenerator = new AtomicLong(0);
103     
104     /** The session ID */
105     private long sessionId;
106     
107     /**
108      * A future that will be set 'closed' when the connection is closed.
109      */
110     private final CloseFuture closeFuture = new DefaultCloseFuture(this);
111 
112     private volatile boolean closing;
113     
114     // traffic control
115     private boolean readSuspended=false;
116     private boolean writeSuspended=false;
117 
118     // Status variables
119     private final AtomicBoolean scheduledForFlush = new AtomicBoolean();
120     private final AtomicInteger scheduledWriteBytes = new AtomicInteger();
121     private final AtomicInteger scheduledWriteMessages = new AtomicInteger();
122 
123     private long readBytes;
124     private long writtenBytes;
125     private long readMessages;
126     private long writtenMessages;
127     private long lastReadTime;
128     private long lastWriteTime;
129 
130     private long lastThroughputCalculationTime;
131     private long lastReadBytes;
132     private long lastWrittenBytes;
133     private long lastReadMessages;
134     private long lastWrittenMessages;
135     private double readBytesThroughput;
136     private double writtenBytesThroughput;
137     private double readMessagesThroughput;
138     private double writtenMessagesThroughput;
139 
140     private AtomicInteger idleCountForBoth = new AtomicInteger();
141     private AtomicInteger idleCountForRead = new AtomicInteger();
142     private AtomicInteger idleCountForWrite = new AtomicInteger();
143 
144     private long lastIdleTimeForBoth;
145     private long lastIdleTimeForRead;
146     private long lastIdleTimeForWrite;
147 
148     private boolean deferDecreaseReadBuffer = true;
149 
150     /**
151      * TODO Add method documentation
152      */
153     protected AbstractIoSession() {
154         // Initialize all the Session counters to the current time 
155         long currentTime = System.currentTimeMillis();
156         creationTime = currentTime;
157         lastThroughputCalculationTime = currentTime;
158         lastReadTime = currentTime;
159         lastWriteTime = currentTime;
160         lastIdleTimeForBoth = currentTime;
161         lastIdleTimeForRead = currentTime;
162         lastIdleTimeForWrite = currentTime;
163         
164         // TODO add documentation
165         closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
166         
167         // Set a new ID for this session
168         sessionId = idGenerator.incrementAndGet();
169     }
170 
171     /**
172      * {@inheritDoc}
173      * 
174      * We use an AtomicLong to guarantee that the session ID are
175      * unique.
176      */
177     public final long getId() {
178         return sessionId;
179     }
180 
181     /**
182      * TODO Add method documentation
183      */
184     public abstract IoProcessor getProcessor();
185 
186     /**
187      * {@inheritDoc}
188      */
189     public final boolean isConnected() {
190         return !closeFuture.isClosed();
191     }
192 
193     /**
194      * {@inheritDoc}
195      */
196     public final boolean isClosing() {
197         return closing || closeFuture.isClosed();
198     }
199 
200     /**
201      * {@inheritDoc}
202      */
203     public final CloseFuture getCloseFuture() {
204         return closeFuture;
205     }
206 
207     /**
208      * TODO Add method documentation
209      */
210     public final boolean isScheduledForFlush() {
211         return scheduledForFlush.get();
212     }
213 
214     /**
215      * TODO Add method documentation
216      */
217     public final boolean setScheduledForFlush(boolean flag) {
218         if (flag) {
219             // If the current tag is set to false, switch it to true 
220             return scheduledForFlush.compareAndSet(false, true);
221         }
222         
223         scheduledForFlush.set(false);
224         return true;
225     }
226 
227     /**
228      * {@inheritDoc}
229      */
230     public final CloseFuture close(boolean rightNow) {
231         if (rightNow) {
232             return close();
233         }
234         
235         return closeOnFlush();
236     }
237 
238     /**
239      * {@inheritDoc}
240      */
241     public final CloseFuture close() {
242         synchronized (lock) {
243             if (isClosing()) {
244                 return closeFuture;
245             }
246             
247             closing = true;
248         }
249 
250         getFilterChain().fireFilterClose();
251         return closeFuture;
252     }
253 
254     private final CloseFuture closeOnFlush() {
255         getWriteRequestQueue().offer(this, CLOSE_REQUEST);
256         getProcessor().flush(this);
257         return closeFuture;
258     }
259 
260     /**
261      * {@inheritDoc}
262      */
263     public final ReadFuture read() {
264         if (!getConfig().isUseReadOperation()) {
265             throw new IllegalStateException("useReadOperation is not enabled.");
266         }
267 
268         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
269         ReadFuture future;
270         synchronized (readyReadFutures) {
271             future = readyReadFutures.poll();
272             if (future != null) {
273                 if (future.isClosed()) {
274                     // Let other readers get notified.
275                     readyReadFutures.offer(future);
276                 }
277             } else {
278                 future = new DefaultReadFuture(this);
279                 getWaitingReadFutures().offer(future);
280             }
281         }
282 
283         return future;
284     }
285 
286     /**
287      * TODO Add method documentation
288      */
289     public final void offerReadFuture(Object message) {
290         newReadFuture().setRead(message);
291     }
292 
293     /**
294      * TODO Add method documentation
295      */
296     public final void offerFailedReadFuture(Throwable exception) {
297         newReadFuture().setException(exception);
298     }
299 
300     /**
301      * TODO Add method documentation
302      */
303     public final void offerClosedReadFuture() {
304         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
305         synchronized (readyReadFutures) {
306             newReadFuture().setClosed();
307         }
308     }
309 
310     /**
311      * TODO Add method documentation
312      */
313     private ReadFuture newReadFuture() {
314         Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
315         Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
316         ReadFuture future;
317         synchronized (readyReadFutures) {
318             future = waitingReadFutures.poll();
319             if (future == null) {
320                 future = new DefaultReadFuture(this);
321                 readyReadFutures.offer(future);
322             }
323         }
324         return future;
325     }
326 
327     /**
328      * TODO Add method documentation
329      */
330     private Queue<ReadFuture> getReadyReadFutures() {
331         Queue<ReadFuture> readyReadFutures =
332             (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
333         if (readyReadFutures == null) {
334             readyReadFutures = new CircularQueue<ReadFuture>();
335 
336             Queue<ReadFuture> oldReadyReadFutures =
337                 (Queue<ReadFuture>) setAttributeIfAbsent(
338                         READY_READ_FUTURES_KEY, readyReadFutures);
339             if (oldReadyReadFutures != null) {
340                 readyReadFutures = oldReadyReadFutures;
341             }
342         }
343         return readyReadFutures;
344     }
345 
346     /**
347      * TODO Add method documentation
348      */
349     private Queue<ReadFuture> getWaitingReadFutures() {
350         Queue<ReadFuture> waitingReadyReadFutures =
351             (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES_KEY);
352         if (waitingReadyReadFutures == null) {
353             waitingReadyReadFutures = new CircularQueue<ReadFuture>();
354 
355             Queue<ReadFuture> oldWaitingReadyReadFutures =
356                 (Queue<ReadFuture>) setAttributeIfAbsent(
357                         WAITING_READ_FUTURES_KEY, waitingReadyReadFutures);
358             if (oldWaitingReadyReadFutures != null) {
359                 waitingReadyReadFutures = oldWaitingReadyReadFutures;
360             }
361         }
362         return waitingReadyReadFutures;
363     }
364 
365     /**
366      * {@inheritDoc}
367      */
368     public WriteFuture write(Object message) {
369         return write(message, null);
370     }
371 
372     /**
373      * {@inheritDoc}
374      */
375     public WriteFuture write(Object message, SocketAddress remoteAddress) {
376         if (message == null) {
377             throw new NullPointerException("message");
378         }
379 
380         // We can't send a message to a connected session if we don't have 
381         // the remote address
382         if (!getTransportMetadata().isConnectionless() &&
383                 remoteAddress != null) {
384             throw new UnsupportedOperationException();
385         }
386 
387         
388         // If the session has been closed or is closing, we can't either
389         // send a message to the remote side. We generate a future
390         // containing an exception.
391         if (isClosing() || !isConnected()) {
392             WriteFuture future = new DefaultWriteFuture(this);
393             WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
394             WriteException writeException = new WriteToClosedSessionException(request);
395             future.setException(writeException);
396             return future;
397         }
398 
399         FileChannel openedFileChannel = null;
400         
401         // TODO: remove this code as soon as we use InputStream
402         // instead of Object for the message.
403         try {
404             if (message instanceof IoBuffer
405                     && !((IoBuffer) message).hasRemaining()) {
406                 // Nothing to write : probably an error in the user code
407                 throw new IllegalArgumentException(
408                 "message is empty. Forgot to call flip()?");
409             } else if (message instanceof FileChannel) {
410                 FileChannel fileChannel = (FileChannel) message;
411                 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
412             } else if (message instanceof File) {
413                 File file = (File) message;
414                 openedFileChannel = new FileInputStream(file).getChannel();
415                 message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size());
416             }
417         } catch (IOException e) {
418             ExceptionMonitor.getInstance().exceptionCaught(e);
419             return DefaultWriteFuture.newNotWrittenFuture(this, e);
420         }
421 
422         // Now, we can write the message. First, create a future
423         WriteFuture writeFuture = new DefaultWriteFuture(this);
424         WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
425         
426         // Then, get the chain and inject the WriteRequest into it
427         IoFilterChain filterChain = getFilterChain();
428         filterChain.fireFilterWrite(writeRequest);
429 
430         // TODO : This is not our business ! The caller has created a FileChannel,
431         // he has to close it !
432         if (openedFileChannel != null) {
433             // If we opened a FileChannel, it needs to be closed when the write has completed
434             final FileChannel finalChannel = openedFileChannel;
435             writeFuture.addListener(new IoFutureListener<WriteFuture>() {
436                 public void operationComplete(WriteFuture future) {
437                     try {
438                         finalChannel.close();
439                     } catch (IOException e) {
440                         ExceptionMonitor.getInstance().exceptionCaught(e);
441                     }
442                 }
443             });
444         }
445 
446         // Return the WriteFuture.
447         return writeFuture;
448     }
449 
450     /**
451      * {@inheritDoc}
452      */
453     public final Object getAttachment() {
454         return getAttribute("");
455     }
456 
457     /**
458      * {@inheritDoc}
459      */
460     public final Object setAttachment(Object attachment) {
461         return setAttribute("", attachment);
462     }
463 
464     /**
465      * {@inheritDoc}
466      */
467     public final Object getAttribute(Object key) {
468         return getAttribute(key, null);
469     }
470 
471     /**
472      * {@inheritDoc}
473      */
474     public final Object getAttribute(Object key, Object defaultValue) {
475         return attributes.getAttribute(this, key, defaultValue);
476     }
477 
478     /**
479      * {@inheritDoc}
480      */
481     public final Object setAttribute(Object key, Object value) {
482         return attributes.setAttribute(this, key, value);
483     }
484 
485     /**
486      * {@inheritDoc}
487      */
488     public final Object setAttribute(Object key) {
489         return setAttribute(key, Boolean.TRUE);
490     }
491 
492     /**
493      * {@inheritDoc}
494      */
495     public final Object setAttributeIfAbsent(Object key, Object value) {
496         return attributes.setAttributeIfAbsent(this, key, value);
497     }
498 
499     /**
500      * {@inheritDoc}
501      */
502     public final Object setAttributeIfAbsent(Object key) {
503         return setAttributeIfAbsent(key, Boolean.TRUE);
504     }
505 
506     /**
507      * {@inheritDoc}
508      */
509     public final Object removeAttribute(Object key) {
510         return attributes.removeAttribute(this, key);
511     }
512 
513     /**
514      * {@inheritDoc}
515      */
516     public final boolean removeAttribute(Object key, Object value) {
517         return attributes.removeAttribute(this, key, value);
518     }
519 
520     /**
521      * {@inheritDoc}
522      */
523     public final boolean replaceAttribute(Object key, Object oldValue, Object newValue) {
524         return attributes.replaceAttribute(this, key, oldValue, newValue);
525     }
526 
527     /**
528      * {@inheritDoc}
529      */
530     public final boolean containsAttribute(Object key) {
531         return attributes.containsAttribute(this, key);
532     }
533 
534     /**
535      * {@inheritDoc}
536      */
537     public final Set<Object> getAttributeKeys() {
538         return attributes.getAttributeKeys(this);
539     }
540 
541     /**
542      * TODO Add method documentation
543      */
544     public final IoSessionAttributeMap getAttributeMap() {
545         return attributes;
546     }
547 
548     /**
549      * TODO Add method documentation
550      */
551     public final void setAttributeMap(IoSessionAttributeMap attributes) {
552         this.attributes = attributes;
553     }
554 
555     /**
556      * Create a new close aware write queue, based on the given write queue.
557      * 
558      * @param writeRequestQueue The write request queue
559      */
560     public final void setWriteRequestQueue(WriteRequestQueue writeRequestQueue) {
561         this.writeRequestQueue =
562             new CloseAwareWriteQueue(writeRequestQueue);
563     }
564 
565 
566     /**
567      * {@inheritDoc}
568      */
569     public final void suspendRead() {
570         readSuspended = true;
571         if (isClosing() || !isConnected()) {
572             return;
573         }
574         getProcessor().updateTrafficControl(this);
575     }
576 
577     /**
578      * {@inheritDoc}
579      */
580     public final void suspendWrite() {
581         writeSuspended = true;
582         if (isClosing() || !isConnected()) {
583             return;
584         }
585         getProcessor().updateTrafficControl(this);
586     }
587 
588     /**
589      * {@inheritDoc}
590      */
591     @SuppressWarnings("unchecked")
592     public final void resumeRead() {
593         readSuspended = false;
594         if (isClosing() || !isConnected()) {
595             return;
596         }
597         getProcessor().updateTrafficControl(this);
598     }
599 
600     /**
601      * {@inheritDoc}
602      */
603     @SuppressWarnings("unchecked")
604     public final void resumeWrite() {
605         writeSuspended = false;
606         if (isClosing() || !isConnected()) {
607             return;
608         }
609         getProcessor().updateTrafficControl(this);
610     }
611 
612     /**
613      * {@inheritDoc}
614      */
615     public boolean isReadSuspended() {
616         return readSuspended;
617     }
618 
619     /**
620      * {@inheritDoc}
621      */
622     public boolean isWriteSuspended() {
623         return writeSuspended; 
624     }
625     
626     /**
627      * {@inheritDoc}
628      */
629     public final long getReadBytes() {
630         return readBytes;
631     }
632 
633     /**
634      * {@inheritDoc}
635      */
636     public final long getWrittenBytes() {
637         return writtenBytes;
638     }
639 
640     /**
641      * {@inheritDoc}
642      */
643     public final long getReadMessages() {
644         return readMessages;
645     }
646 
647     /**
648      * {@inheritDoc}
649      */
650     public final long getWrittenMessages() {
651         return writtenMessages;
652     }
653 
654     /**
655      * {@inheritDoc}
656      */
657     public final double getReadBytesThroughput() {
658         return readBytesThroughput;
659     }
660 
661     /**
662      * {@inheritDoc}
663      */
664     public final double getWrittenBytesThroughput() {
665         return writtenBytesThroughput;
666     }
667 
668     /**
669      * {@inheritDoc}
670      */
671     public final double getReadMessagesThroughput() {
672         return readMessagesThroughput;
673     }
674 
675     /**
676      * {@inheritDoc}
677      */
678     public final double getWrittenMessagesThroughput() {
679         return writtenMessagesThroughput;
680     }
681 
682     /**
683      * {@inheritDoc}
684      */
685     public final void updateThroughput(long currentTime, boolean force) {
686         int interval = (int) (currentTime - lastThroughputCalculationTime);
687 
688         long minInterval = getConfig().getThroughputCalculationIntervalInMillis();
689         if (minInterval == 0 || interval < minInterval) {
690             if (!force) {
691                 return;
692             }
693         }
694 
695         readBytesThroughput = (readBytes - lastReadBytes) * 1000.0 / interval;
696         writtenBytesThroughput = (writtenBytes - lastWrittenBytes) * 1000.0 / interval;
697         readMessagesThroughput = (readMessages - lastReadMessages) * 1000.0 / interval;
698         writtenMessagesThroughput = (writtenMessages - lastWrittenMessages) * 1000.0 / interval;
699 
700         lastReadBytes = readBytes;
701         lastWrittenBytes = writtenBytes;
702         lastReadMessages = readMessages;
703         lastWrittenMessages = writtenMessages;
704 
705         lastThroughputCalculationTime = currentTime;
706     }
707 
708     /**
709      * {@inheritDoc}
710      */
711     public final long getScheduledWriteBytes() {
712         return scheduledWriteBytes.get();
713     }
714 
715     /**
716      * {@inheritDoc}
717      */
718     public final int getScheduledWriteMessages() {
719         return scheduledWriteMessages.get();
720     }
721 
722     /**
723      * TODO Add method documentation
724      */
725     protected void setScheduledWriteBytes(int byteCount){
726         scheduledWriteBytes.set(byteCount);
727     }
728 
729     /**
730      * TODO Add method documentation
731      */
732     protected void setScheduledWriteMessages(int messages) {
733         scheduledWriteMessages.set(messages);
734     }
735 
736     /**
737      * TODO Add method documentation
738      */
739     public final void increaseReadBytes(long increment, long currentTime) {
740         if (increment <= 0) {
741             return;
742         }
743 
744         readBytes += increment;
745         lastReadTime = currentTime;
746         idleCountForBoth.set(0);
747         idleCountForRead.set(0);
748 
749         if (getService() instanceof AbstractIoService) {
750             ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
751         }
752     }
753 
754     /**
755      * TODO Add method documentation
756      */
757     public final void increaseReadMessages(long currentTime) {
758         readMessages++;
759         lastReadTime = currentTime;
760         idleCountForBoth.set(0);
761         idleCountForRead.set(0);
762 
763         if (getService() instanceof AbstractIoService) {
764             ((AbstractIoService) getService()).getStatistics().increaseReadMessages(currentTime);
765         }
766     }
767 
768     /**
769      * TODO Add method documentation
770      */
771     public final void increaseWrittenBytes(int increment, long currentTime) {
772         if (increment <= 0) {
773             return;
774         }
775 
776         writtenBytes += increment;
777         lastWriteTime = currentTime;
778         idleCountForBoth.set(0);
779         idleCountForWrite.set(0);
780 
781         if (getService() instanceof AbstractIoService) {
782             ((AbstractIoService) getService()).getStatistics().increaseWrittenBytes(increment, currentTime);
783         }
784 
785         increaseScheduledWriteBytes(-increment);
786     }
787 
788     /**
789      * TODO Add method documentation
790      */
791     public final void increaseWrittenMessages(
792             WriteRequest request, long currentTime) {
793         Object message = request.getMessage();
794         if (message instanceof IoBuffer) {
795             IoBuffer b = (IoBuffer) message;
796             if (b.hasRemaining()) {
797                 return;
798             }
799         }
800 
801         writtenMessages++;
802         lastWriteTime = currentTime;
803         if (getService() instanceof AbstractIoService) {
804             ((AbstractIoService) getService()).getStatistics().increaseWrittenMessages(currentTime);
805         }
806 
807         decreaseScheduledWriteMessages();
808     }
809 
810     /**
811      * TODO Add method documentation
812      */
813     public final void increaseScheduledWriteBytes(int increment) {
814         scheduledWriteBytes.addAndGet(increment);
815         if (getService() instanceof AbstractIoService) {
816             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteBytes(increment);
817         }
818     }
819 
820     /**
821      * TODO Add method documentation
822      */
823     public final void increaseScheduledWriteMessages() {
824         scheduledWriteMessages.incrementAndGet();
825         if (getService() instanceof AbstractIoService) {
826             ((AbstractIoService) getService()).getStatistics().increaseScheduledWriteMessages();
827         }
828     }
829 
830     /**
831      * TODO Add method documentation
832      */
833     private void decreaseScheduledWriteMessages() {
834         scheduledWriteMessages.decrementAndGet();
835         if (getService() instanceof AbstractIoService) {
836             ((AbstractIoService) getService()).getStatistics().decreaseScheduledWriteMessages();
837         }
838     }
839 
840     /**
841      * TODO Add method documentation
842      */
843     public final void decreaseScheduledBytesAndMessages(WriteRequest request) {
844         Object message = request.getMessage();
845         if (message instanceof IoBuffer) {
846             IoBuffer b = (IoBuffer) message;
847             if (b.hasRemaining()) {
848                 increaseScheduledWriteBytes(-((IoBuffer) message).remaining());
849             } else {
850                 decreaseScheduledWriteMessages();
851             }
852         } else {
853             decreaseScheduledWriteMessages();
854         }
855     }
856 
857     /**
858      * {@inheritDoc}
859      */
860     public final WriteRequestQueue getWriteRequestQueue() {
861         if (writeRequestQueue == null) {
862             throw new IllegalStateException();
863         }
864         return writeRequestQueue;
865     }
866 
867     /**
868      * {@inheritDoc}
869      */
870     public final WriteRequest getCurrentWriteRequest() {
871         return currentWriteRequest;
872     }
873 
874     /**
875      * {@inheritDoc}
876      */
877     public final Object getCurrentWriteMessage() {
878         WriteRequest req = getCurrentWriteRequest();
879         if (req == null) {
880             return null;
881         }
882         return req.getMessage();
883     }
884 
885     /**
886      * {@inheritDoc}
887      */
888     public final void setCurrentWriteRequest(WriteRequest currentWriteRequest) {
889         this.currentWriteRequest = currentWriteRequest;
890     }
891 
892     /**
893      * TODO Add method documentation
894      */
895     public final void increaseReadBufferSize() {
896         int newReadBufferSize = getConfig().getReadBufferSize() << 1;
897         if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
898             getConfig().setReadBufferSize(newReadBufferSize);
899         } else {
900             getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
901         }
902 
903         deferDecreaseReadBuffer = true;
904     }
905 
906     /**
907      * TODO Add method documentation
908      */
909     public final void decreaseReadBufferSize() {
910         if (deferDecreaseReadBuffer) {
911             deferDecreaseReadBuffer = false;
912             return;
913         }
914 
915         if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
916             getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
917         }
918 
919         deferDecreaseReadBuffer = true;
920     }
921 
922     /**
923      * {@inheritDoc}
924      */
925     public final long getCreationTime() {
926         return creationTime;
927     }
928 
929     /**
930      * {@inheritDoc}
931      */
932     public final long getLastIoTime() {
933         return Math.max(lastReadTime, lastWriteTime);
934     }
935 
936     /**
937      * {@inheritDoc}
938      */
939     public final long getLastReadTime() {
940         return lastReadTime;
941     }
942 
943     /**
944      * {@inheritDoc}
945      */
946     public final long getLastWriteTime() {
947         return lastWriteTime;
948     }
949 
950     /**
951      * {@inheritDoc}
952      */
953     public final boolean isIdle(IdleStatus status) {
954         if (status == IdleStatus.BOTH_IDLE) {
955             return idleCountForBoth.get() > 0;
956         }
957 
958         if (status == IdleStatus.READER_IDLE) {
959             return idleCountForRead.get() > 0;
960         }
961 
962         if (status == IdleStatus.WRITER_IDLE) {
963             return idleCountForWrite.get() > 0;
964         }
965 
966         throw new IllegalArgumentException("Unknown idle status: " + status);
967     }
968 
969     /**
970      * {@inheritDoc}
971      */
972     public final boolean isBothIdle() {
973         return isIdle(IdleStatus.BOTH_IDLE);
974     }
975 
976     /**
977      * {@inheritDoc}
978      */
979     public final boolean isReaderIdle() {
980         return isIdle(IdleStatus.READER_IDLE);
981     }
982 
983     /**
984      * {@inheritDoc}
985      */
986     public final boolean isWriterIdle() {
987         return isIdle(IdleStatus.WRITER_IDLE);
988     }
989 
990     /**
991      * {@inheritDoc}
992      */
993     public final int getIdleCount(IdleStatus status) {
994         if (getConfig().getIdleTime(status) == 0) {
995             if (status == IdleStatus.BOTH_IDLE) {
996                 idleCountForBoth.set(0);
997             }
998 
999             if (status == IdleStatus.READER_IDLE) {
1000                 idleCountForRead.set(0);
1001             }
1002 
1003             if (status == IdleStatus.WRITER_IDLE) {
1004                 idleCountForWrite.set(0);
1005             }
1006         }
1007 
1008         if (status == IdleStatus.BOTH_IDLE) {
1009             return idleCountForBoth.get();
1010         }
1011 
1012         if (status == IdleStatus.READER_IDLE) {
1013             return idleCountForRead.get();
1014         }
1015 
1016         if (status == IdleStatus.WRITER_IDLE) {
1017             return idleCountForWrite.get();
1018         }
1019 
1020         throw new IllegalArgumentException("Unknown idle status: " + status);
1021     }
1022 
1023     /**
1024      * {@inheritDoc}
1025      */
1026     public final long getLastIdleTime(IdleStatus status) {
1027         if (status == IdleStatus.BOTH_IDLE) {
1028             return lastIdleTimeForBoth;
1029         }
1030 
1031         if (status == IdleStatus.READER_IDLE) {
1032             return lastIdleTimeForRead;
1033         }
1034 
1035         if (status == IdleStatus.WRITER_IDLE) {
1036             return lastIdleTimeForWrite;
1037         }
1038 
1039         throw new IllegalArgumentException("Unknown idle status: " + status);
1040     }
1041 
1042     /**
1043      * TODO Add method documentation
1044      */
1045     public final void increaseIdleCount(IdleStatus status, long currentTime) {
1046         if (status == IdleStatus.BOTH_IDLE) {
1047             idleCountForBoth.incrementAndGet();
1048             lastIdleTimeForBoth = currentTime;
1049         } else if (status == IdleStatus.READER_IDLE) {
1050             idleCountForRead.incrementAndGet();
1051             lastIdleTimeForRead = currentTime;
1052         } else if (status == IdleStatus.WRITER_IDLE) {
1053             idleCountForWrite.incrementAndGet();
1054             lastIdleTimeForWrite = currentTime;
1055         } else {
1056             throw new IllegalArgumentException("Unknown idle status: " + status);
1057         }
1058     }
1059 
1060     /**
1061      * {@inheritDoc}
1062      */
1063     public final int getBothIdleCount() {
1064         return getIdleCount(IdleStatus.BOTH_IDLE);
1065     }
1066 
1067     /**
1068      * {@inheritDoc}
1069      */
1070     public final long getLastBothIdleTime() {
1071         return getLastIdleTime(IdleStatus.BOTH_IDLE);
1072     }
1073 
1074     /**
1075      * {@inheritDoc}
1076      */
1077     public final long getLastReaderIdleTime() {
1078         return getLastIdleTime(IdleStatus.READER_IDLE);
1079     }
1080 
1081     /**
1082      * {@inheritDoc}
1083      */
1084     public final long getLastWriterIdleTime() {
1085         return getLastIdleTime(IdleStatus.WRITER_IDLE);
1086     }
1087 
1088     /**
1089      * {@inheritDoc}
1090      */
1091     public final int getReaderIdleCount() {
1092         return getIdleCount(IdleStatus.READER_IDLE);
1093     }
1094 
1095     /**
1096      * {@inheritDoc}
1097      */
1098     public final int getWriterIdleCount() {
1099         return getIdleCount(IdleStatus.WRITER_IDLE);
1100     }
1101 
1102     /**
1103      * {@inheritDoc}
1104      */
1105     public SocketAddress getServiceAddress() {
1106         IoService service = getService();
1107         if (service instanceof IoAcceptor) {
1108             return ((IoAcceptor) service).getLocalAddress();
1109         }
1110         
1111         return getRemoteAddress();
1112     }
1113 
1114     /**
1115      * {@inheritDoc}
1116      */
1117     @Override
1118     public final int hashCode() {
1119         return super.hashCode();
1120     }
1121 
1122     /**
1123      * {@inheritDoc}
1124      * TODO This is a ridiculous implementation. Need to be replaced.
1125      */
1126     @Override
1127     public final boolean equals(Object o) {
1128         return super.equals(o);
1129     }
1130 
1131     /**
1132      * {@inheritDoc}
1133      */
1134     @Override
1135     public String toString() {
1136         if (isConnected()||isClosing()) {
1137             try {
1138                 SocketAddress remote = getRemoteAddress();
1139                 SocketAddress local = getLocalAddress();
1140                 
1141                 if (getService() instanceof IoAcceptor) {
1142                     return "(" + getIdAsString() + ": " + getServiceName() + ", server, " +
1143                             remote + " => " + local + ')';
1144                 }
1145     
1146                 return "(" + getIdAsString() + ": " + getServiceName() + ", client, " +
1147                             local + " => " + remote + ')';
1148             } catch (Exception e) {
1149                 return "Session is disconnecting ...";
1150             }
1151         }
1152         
1153         return "Session disconnected ...";
1154     }
1155 
1156     /**
1157      * TODO Add method documentation
1158      */
1159     private String getIdAsString() {
1160         String id = Long.toHexString(getId()).toUpperCase();
1161 
1162         // Somewhat inefficient, but it won't happen that often
1163         // because an ID is often a big integer.
1164         while (id.length() < 8) {
1165             id = '0' + id; // padding
1166         }
1167         id = "0x" + id;
1168 
1169         return id;
1170     }
1171 
1172     /**
1173      * TODO Add method documentation
1174      */
1175     private String getServiceName() {
1176         TransportMetadata tm = getTransportMetadata();
1177         if (tm == null) {
1178             return "null";
1179         }
1180         
1181         return tm.getProviderName() + ' ' + tm.getName();
1182     }
1183 
1184     /**
1185      * Fires a {@link IoEventType#SESSION_IDLE} event to any applicable
1186      * sessions in the specified collection.
1187      *
1188      * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1189      */
1190     public static void notifyIdleness(Iterator<? extends IoSession> sessions, long currentTime) {
1191         IoSession s = null;
1192         while (sessions.hasNext()) {
1193             s = sessions.next();
1194             notifyIdleSession(s, currentTime);
1195         }
1196     }
1197 
1198     /**
1199      * Fires a {@link IoEventType#SESSION_IDLE} event if applicable for the
1200      * specified {@code session}.
1201      *
1202      * @param currentTime the current time (i.e. {@link System#currentTimeMillis()})
1203      */
1204     public static void notifyIdleSession(IoSession session, long currentTime) {
1205         notifyIdleSession0(
1206                 session, currentTime,
1207                 session.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
1208                 IdleStatus.BOTH_IDLE, Math.max(
1209                     session.getLastIoTime(),
1210                     session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
1211 
1212         notifyIdleSession0(
1213                 session, currentTime,
1214                 session.getConfig().getIdleTimeInMillis(IdleStatus.READER_IDLE),
1215                 IdleStatus.READER_IDLE, Math.max(
1216                     session.getLastReadTime(),
1217                     session.getLastIdleTime(IdleStatus.READER_IDLE)));
1218 
1219         notifyIdleSession0(
1220                 session, currentTime,
1221                 session.getConfig().getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
1222                 IdleStatus.WRITER_IDLE, Math.max(
1223                     session.getLastWriteTime(),
1224                     session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
1225 
1226         notifyWriteTimeout(session, currentTime);
1227     }
1228 
1229     private static void notifyIdleSession0(
1230             IoSession session, long currentTime,
1231             long idleTime, IdleStatus status, long lastIoTime) {
1232         if (idleTime > 0 && lastIoTime != 0
1233                 && currentTime - lastIoTime >= idleTime) {
1234             session.getFilterChain().fireSessionIdle(status);
1235         }
1236     }
1237 
1238     private static void notifyWriteTimeout(
1239             IoSession session, long currentTime) {
1240 
1241         long writeTimeout = session.getConfig().getWriteTimeoutInMillis();
1242         if (writeTimeout > 0 &&
1243                 currentTime - session.getLastWriteTime() >= writeTimeout &&
1244                 !session.getWriteRequestQueue().isEmpty(session)) {
1245             WriteRequest request = session.getCurrentWriteRequest();
1246             if (request != null) {
1247                 session.setCurrentWriteRequest(null);
1248                 WriteTimeoutException cause = new WriteTimeoutException(request);
1249                 request.getFuture().setException(cause);
1250                 session.getFilterChain().fireExceptionCaught(cause);
1251                 // WriteException is an IOException, so we close the session.
1252                 session.close(true);
1253             }
1254         }
1255     }
1256 
1257     
1258     
1259     /**
1260      * A queue which handles the CLOSE request.
1261      * 
1262      * TODO : Check that when closing a session, all the pending
1263      * requests are correctly sent.
1264      */
1265     private class CloseAwareWriteQueue implements WriteRequestQueue {
1266 
1267         private final WriteRequestQueue q;
1268 
1269         /**
1270          * {@inheritDoc}
1271          */
1272         public CloseAwareWriteQueue(WriteRequestQueue q) {
1273             this.q = q;
1274         }
1275 
1276         /**
1277          * {@inheritDoc}
1278          */
1279         public synchronized WriteRequest poll(IoSession session) {
1280             WriteRequest answer = q.poll(session);
1281             
1282             if (answer == CLOSE_REQUEST) {
1283                 AbstractIoSession.this.close();
1284                 dispose(session);
1285                 answer = null;
1286             }
1287             
1288             return answer;
1289         }
1290 
1291         /**
1292          * {@inheritDoc}
1293          */
1294         public void offer(IoSession session, WriteRequest e) {
1295             q.offer(session, e);
1296         }
1297 
1298         /**
1299          * {@inheritDoc}
1300          */
1301         public boolean isEmpty(IoSession session) {
1302             return q.isEmpty(session);
1303         }
1304 
1305         /**
1306          * {@inheritDoc}
1307          */
1308         public void clear(IoSession session) {
1309             q.clear(session);
1310         }
1311 
1312         /**
1313          * {@inheritDoc}
1314          */
1315         public void dispose(IoSession session) {
1316             q.dispose(session);
1317         }
1318     }
1319 }