View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.net.Inet4Address;
24  import java.net.Inet6Address;
25  import java.net.InetAddress;
26  import java.net.InetSocketAddress;
27  import java.net.SocketAddress;
28  import java.nio.channels.ClosedSelectorException;
29  import java.nio.channels.DatagramChannel;
30  import java.nio.channels.SelectionKey;
31  import java.nio.channels.Selector;
32  import java.util.Collections;
33  import java.util.HashMap;
34  import java.util.HashSet;
35  import java.util.Iterator;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Queue;
39  import java.util.Set;
40  import java.util.concurrent.ConcurrentLinkedQueue;
41  import java.util.concurrent.Executor;
42  import java.util.concurrent.Semaphore;
43  
44  import org.apache.mina.core.RuntimeIoException;
45  import org.apache.mina.core.buffer.IoBuffer;
46  import org.apache.mina.core.service.AbstractIoAcceptor;
47  import org.apache.mina.core.service.IoAcceptor;
48  import org.apache.mina.core.service.IoProcessor;
49  import org.apache.mina.core.service.TransportMetadata;
50  import org.apache.mina.core.session.AbstractIoSession;
51  import org.apache.mina.core.session.ExpiringSessionRecycler;
52  import org.apache.mina.core.session.IoSession;
53  import org.apache.mina.core.session.IoSessionConfig;
54  import org.apache.mina.core.session.IoSessionRecycler;
55  import org.apache.mina.core.write.WriteRequest;
56  import org.apache.mina.core.write.WriteRequestQueue;
57  import org.apache.mina.transport.socket.DatagramAcceptor;
58  import org.apache.mina.transport.socket.DatagramSessionConfig;
59  import org.apache.mina.transport.socket.DefaultDatagramSessionConfig;
60  import org.apache.mina.util.ExceptionMonitor;
61  
62  /**
63   * {@link IoAcceptor} for datagram transport (UDP/IP).
64   *
65   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
66   * @org.apache.xbean.XBean
67   */
68  public final class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor, IoProcessor<NioSession> {
69      /**
70       * A session recycler that is used to retrieve an existing session, unless it's too old.
71       **/
72      private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
73  
74      /**
75       * A timeout used for the select, as we need to get out to deal with idle
76       * sessions
77       */
78      private static final long SELECT_TIMEOUT = 1000L;
79  
80      /** A lock used to protect the selector to be waked up before it's created */
81      private final Semaphore lock = new Semaphore(1);
82  
83      /** A queue used to store the list of pending Binds */
84      private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>();
85  
86      private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>();
87  
88      private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<>();
89  
90      private final Map<SocketAddress, DatagramChannel> boundHandles = Collections
91              .synchronizedMap(new HashMap<SocketAddress, DatagramChannel>());
92  
93      private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
94  
95      private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
96  
97      private volatile boolean selectable;
98  
99      /** The thread responsible of accepting incoming requests */
100     private Acceptor acceptor;
101 
102     private long lastIdleCheckTime;
103 
104     /** The Selector used by this acceptor */
105     private volatile Selector selector;
106 
107     /**
108      * Creates a new instance.
109      */
110     public NioDatagramAcceptor() {
111         this(new DefaultDatagramSessionConfig(), null);
112     }
113 
114     /**
115      * Creates a new instance.
116      * 
117      * @param executor The executor to use
118      */
119     public NioDatagramAcceptor(Executor executor) {
120         this(new DefaultDatagramSessionConfig(), executor);
121     }
122 
123     /**
124      * Creates a new instance.
125      */
126     private NioDatagramAcceptor(IoSessionConfig sessionConfig, Executor executor) {
127         super(sessionConfig, executor);
128 
129         try {
130             init();
131             selectable = true;
132         } catch (RuntimeException e) {
133             throw e;
134         } catch (Exception e) {
135             throw new RuntimeIoException("Failed to initialize.", e);
136         } finally {
137             if (!selectable) {
138                 try {
139                     destroy();
140                 } catch (Exception e) {
141                     ExceptionMonitor.getInstance().exceptionCaught(e);
142                 }
143             }
144         }
145     }
146 
147     /**
148      * This private class is used to accept incoming connection from
149      * clients. It's an infinite loop, which can be stopped when all
150      * the registered handles have been removed (unbound).
151      */
152     private class Acceptor implements Runnable {
153         @Override
154         public void run() {
155             int nHandles = 0;
156             lastIdleCheckTime = System.currentTimeMillis();
157 
158             // Release the lock
159             lock.release();
160 
161             while (selectable) {
162                 try {
163                     int selected = select(SELECT_TIMEOUT);
164 
165                     nHandles += registerHandles();
166 
167                     if (nHandles == 0) {
168                         try {
169                             lock.acquire();
170 
171                             if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
172                                 acceptor = null;
173                                 break;
174                             }
175                         } finally {
176                             lock.release();
177                         }
178                     }
179 
180                     if (selected > 0) {
181                         processReadySessions(selectedHandles());
182                     }
183 
184                     long currentTime = System.currentTimeMillis();
185                     flushSessions(currentTime);
186                     nHandles -= unregisterHandles();
187 
188                     notifyIdleSessions(currentTime);
189                 } catch (ClosedSelectorException cse) {
190                     // If the selector has been closed, we can exit the loop
191                     ExceptionMonitor.getInstance().exceptionCaught(cse);
192                     break;
193                 } catch (Exception e) {
194                     ExceptionMonitor.getInstance().exceptionCaught(e);
195 
196                     try {
197                         Thread.sleep(1000);
198                     } catch (InterruptedException e1) {
199                     }
200                 }
201             }
202 
203             if (selectable && isDisposing()) {
204                 selectable = false;
205                 try {
206                     destroy();
207                 } catch (Exception e) {
208                     ExceptionMonitor.getInstance().exceptionCaught(e);
209                 } finally {
210                     disposalFuture.setValue(true);
211                 }
212             }
213         }
214     }
215 
216     private int registerHandles() {
217         for (;;) {
218             AcceptorOperationFuture req = registerQueue.poll();
219 
220             if (req == null) {
221                 break;
222             }
223 
224             Map<SocketAddress, DatagramChannel> newHandles = new HashMap<>();
225             List<SocketAddress> localAddresses = req.getLocalAddresses();
226 
227             try {
228                 for (SocketAddress socketAddress : localAddresses) {
229                     DatagramChannel handle = open(socketAddress);
230                     newHandles.put(localAddress(handle), handle);
231                 }
232 
233                 boundHandles.putAll(newHandles);
234 
235                 getListeners().fireServiceActivated();
236                 req.setDone();
237 
238                 return newHandles.size();
239             } catch (Exception e) {
240                 req.setException(e);
241             } finally {
242                 // Roll back if failed to bind all addresses.
243                 if (req.getException() != null) {
244                     for (DatagramChannel handle : newHandles.values()) {
245                         try {
246                             close(handle);
247                         } catch (Exception e) {
248                             ExceptionMonitor.getInstance().exceptionCaught(e);
249                         }
250                     }
251 
252                     wakeup();
253                 }
254             }
255         }
256 
257         return 0;
258     }
259 
260     private void processReadySessions(Set<SelectionKey> handles) {
261         Iterator<SelectionKey> iterator = handles.iterator();
262 
263         while (iterator.hasNext()) {
264             SelectionKey key = iterator.next();
265             DatagramChannel handle = (DatagramChannel) key.channel();
266             iterator.remove();
267 
268             try {
269                 if (key.isValid() && key.isReadable()) {
270                     readHandle(handle);
271                 }
272 
273                 if (key.isValid() && key.isWritable()) {
274                     for (IoSession session : getManagedSessions().values()) {
275                         scheduleFlush((NioSession) session);
276                     }
277                 }
278             } catch (Exception e) {
279                 ExceptionMonitor.getInstance().exceptionCaught(e);
280             }
281         }
282     }
283 
284     private boolean scheduleFlush(NioSession session) {
285         // Set the schedule for flush flag if the session
286         // has not already be added to the flushingSessions
287         // queue
288         if (session.setScheduledForFlush(true)) {
289             flushingSessions.add(session);
290             return true;
291         } else {
292             return false;
293         }
294     }
295 
296     private void readHandle(DatagramChannel handle) throws Exception {
297         IoBuffer readBuf = IoBuffer.allocate(getSessionConfig().getReadBufferSize());
298 
299         SocketAddress remoteAddress = receive(handle, readBuf);
300 
301         if (remoteAddress != null) {
302             IoSession session = newSessionWithoutLock(remoteAddress, localAddress(handle));
303 
304             readBuf.flip();
305 
306             if (!session.isReadSuspended()) {
307                 session.getFilterChain().fireMessageReceived(readBuf);
308             }
309         }
310     }
311 
312     private IoSession newSessionWithoutLock(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
313         DatagramChannel handle = boundHandles.get(localAddress);
314 
315         if (handle == null) {
316             throw new IllegalArgumentException("Unknown local address: " + localAddress);
317         }
318 
319         IoSession session;
320 
321         synchronized (sessionRecycler) {
322             session = sessionRecycler.recycle(remoteAddress);
323 
324             if (session != null) {
325                 return session;
326             }
327 
328             // If a new session needs to be created.
329             NioSession newSession = newSession(this, handle, remoteAddress);
330             getSessionRecycler().put(newSession);
331             session = newSession;
332         }
333 
334         initSession(session, null, null);
335 
336         try {
337             this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
338             getListeners().fireSessionCreated(session);
339         } catch (Exception e) {
340             ExceptionMonitor.getInstance().exceptionCaught(e);
341         }
342 
343         return session;
344     }
345 
346     private void flushSessions(long currentTime) {
347         for (;;) {
348             NioSession session = flushingSessions.poll();
349 
350             if (session == null) {
351                 break;
352             }
353 
354             // Reset the Schedule for flush flag for this session,
355             // as we are flushing it now
356             session.unscheduledForFlush();
357 
358             try {
359                 boolean flushedAll = flush(session, currentTime);
360 
361                 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) {
362                     scheduleFlush(session);
363                 }
364             } catch (Exception e) {
365                 session.getFilterChain().fireExceptionCaught(e);
366             }
367         }
368     }
369 
370     private boolean flush(NioSession session, long currentTime) throws Exception {
371         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
372         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
373                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
374 
375         int writtenBytes = 0;
376 
377         try {
378             for (;;) {
379                 WriteRequest req = session.getCurrentWriteRequest();
380 
381                 if (req == null) {
382                     req = writeRequestQueue.poll(session);
383 
384                     if (req == null) {
385                         setInterestedInWrite(session, false);
386                         break;
387                     }
388 
389                     session.setCurrentWriteRequest(req);
390                 }
391 
392                 IoBuffer buf = (IoBuffer) req.getMessage();
393 
394                 if (buf.remaining() == 0) {
395                     // Clear and fire event
396                     session.setCurrentWriteRequest(null);
397                     buf.reset();
398                     session.getFilterChain().fireMessageSent(req);
399                     continue;
400                 }
401 
402                 SocketAddress destination = req.getDestination();
403 
404                 if (destination == null) {
405                     destination = session.getRemoteAddress();
406                 }
407 
408                 int localWrittenBytes = send(session, buf, destination);
409 
410                 if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
411                     // Kernel buffer is full or wrote too much
412                     setInterestedInWrite(session, true);
413 
414                     return false;
415                 } else {
416                     setInterestedInWrite(session, false);
417 
418                     // Clear and fire event
419                     session.setCurrentWriteRequest(null);
420                     writtenBytes += localWrittenBytes;
421                     buf.reset();
422                     session.getFilterChain().fireMessageSent(req);
423                 }
424             }
425         } finally {
426             session.increaseWrittenBytes(writtenBytes, currentTime);
427         }
428 
429         return true;
430     }
431 
432     private int unregisterHandles() {
433         int nHandles = 0;
434 
435         for (;;) {
436             AcceptorOperationFuture request = cancelQueue.poll();
437             if (request == null) {
438                 break;
439             }
440 
441             // close the channels
442             for (SocketAddress socketAddress : request.getLocalAddresses()) {
443                 DatagramChannel handle = boundHandles.remove(socketAddress);
444 
445                 if (handle == null) {
446                     continue;
447                 }
448 
449                 try {
450                     close(handle);
451                     wakeup(); // wake up again to trigger thread death
452                 } catch (Exception e) {
453                     ExceptionMonitor.getInstance().exceptionCaught(e);
454                 } finally {
455                     nHandles++;
456                 }
457             }
458 
459             request.setDone();
460         }
461 
462         return nHandles;
463     }
464 
465     private void notifyIdleSessions(long currentTime) {
466         // process idle sessions
467         if (currentTime - lastIdleCheckTime >= 1000) {
468             lastIdleCheckTime = currentTime;
469             AbstractIoSession.notifyIdleness(getListeners().getManagedSessions().values().iterator(), currentTime);
470         }
471     }
472 
473     /**
474      * Starts the inner Acceptor thread.
475      */
476     private void startupAcceptor() throws InterruptedException {
477         if (!selectable) {
478             registerQueue.clear();
479             cancelQueue.clear();
480             flushingSessions.clear();
481         }
482 
483         lock.acquire();
484 
485         if (acceptor == null) {
486             acceptor = new Acceptor();
487             executeWorker(acceptor);
488         } else {
489             lock.release();
490         }
491     }
492 
493     protected void init() throws Exception {
494         this.selector = Selector.open();
495     }
496 
497     /**
498      * {@inheritDoc}
499      */
500     @Override
501     public void add(NioSession session) {
502         // Nothing to do for UDP
503     }
504 
505     /**
506      * {@inheritDoc}
507      */
508     @Override
509     protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
510         // Create a bind request as a Future operation. When the selector
511         // have handled the registration, it will signal this future.
512         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
513 
514         // adds the Registration request to the queue for the Workers
515         // to handle
516         registerQueue.add(request);
517 
518         // creates the Acceptor instance and has the local
519         // executor kick it off.
520         startupAcceptor();
521 
522         // As we just started the acceptor, we have to unblock the select()
523         // in order to process the bind request we just have added to the
524         // registerQueue.
525         try {
526             lock.acquire();
527 
528             // Wait a bit to give a chance to the Acceptor thread to do the select()
529             Thread.sleep(10);
530             wakeup();
531         } finally {
532             lock.release();
533         }
534 
535         // Now, we wait until this request is completed.
536         request.awaitUninterruptibly();
537 
538         if (request.getException() != null) {
539             throw request.getException();
540         }
541 
542         // Update the local addresses.
543         // setLocalAddresses() shouldn't be called from the worker thread
544         // because of deadlock.
545         Set<SocketAddress> newLocalAddresses = new HashSet<>();
546 
547         for (DatagramChannel handle : boundHandles.values()) {
548             newLocalAddresses.add(localAddress(handle));
549         }
550 
551         return newLocalAddresses;
552     }
553 
554     protected void close(DatagramChannel handle) throws Exception {
555         SelectionKey key = handle.keyFor(selector);
556 
557         if (key != null) {
558             key.cancel();
559         }
560 
561         handle.disconnect();
562         handle.close();
563     }
564 
565     protected void destroy() throws Exception {
566         if (selector != null) {
567             selector.close();
568         }
569     }
570 
571     /**
572      * {@inheritDoc}
573      */
574     @Override
575     protected void dispose0() throws Exception {
576         unbind();
577         startupAcceptor();
578         wakeup();
579     }
580 
581     /**
582      * {@inheritDoc}
583      */
584     @Override
585     public void flush(NioSession session) {
586         if (scheduleFlush(session)) {
587             wakeup();
588         }
589     }
590 
591     @Override
592     public InetSocketAddress getDefaultLocalAddress() {
593         return (InetSocketAddress) super.getDefaultLocalAddress();
594     }
595 
596     @Override
597     public InetSocketAddress getLocalAddress() {
598         return (InetSocketAddress) super.getLocalAddress();
599     }
600 
601     /**
602      * {@inheritDoc}
603      */
604     @Override
605     public DatagramSessionConfig getSessionConfig() {
606         return (DatagramSessionConfig) sessionConfig;
607     }
608 
609     @Override
610     public final IoSessionRecycler getSessionRecycler() {
611         return sessionRecycler;
612     }
613 
614     @Override
615     public TransportMetadata getTransportMetadata() {
616         return NioDatagramSession.METADATA;
617     }
618 
619     protected boolean isReadable(DatagramChannel handle) {
620         SelectionKey key = handle.keyFor(selector);
621 
622         if ((key == null) || (!key.isValid())) {
623             return false;
624         }
625 
626         return key.isReadable();
627     }
628 
629     protected boolean isWritable(DatagramChannel handle) {
630         SelectionKey key = handle.keyFor(selector);
631 
632         if ((key == null) || (!key.isValid())) {
633             return false;
634         }
635 
636         return key.isWritable();
637     }
638 
639     protected SocketAddress localAddress(DatagramChannel handle) throws Exception {
640         InetSocketAddress inetSocketAddress = (InetSocketAddress) handle.socket().getLocalSocketAddress();
641         InetAddress inetAddress = inetSocketAddress.getAddress();
642 
643         if ((inetAddress instanceof Inet6Address) && (((Inet6Address) inetAddress).isIPv4CompatibleAddress())) {
644             // Ugly hack to workaround a problem on linux : the ANY address is always converted to IPV6
645             // even if the original address was an IPV4 address. We do store the two IPV4 and IPV6
646             // ANY address in the map.
647             byte[] ipV6Address = ((Inet6Address) inetAddress).getAddress();
648             byte[] ipV4Address = new byte[4];
649 
650             System.arraycopy(ipV6Address, 12, ipV4Address, 0, 4);
651 
652             InetAddress inet4Adress = Inet4Address.getByAddress(ipV4Address);
653             return new InetSocketAddress(inet4Adress, inetSocketAddress.getPort());
654         } else {
655             return inetSocketAddress;
656         }
657     }
658 
659     protected NioSession newSession(IoProcessor<NioSession> processor, DatagramChannel handle,
660             SocketAddress remoteAddress) {
661         SelectionKey key = handle.keyFor(selector);
662 
663         if ((key == null) || (!key.isValid())) {
664             return null;
665         }
666 
667         NioDatagramSession newSession = new NioDatagramSession(this, handle, processor, remoteAddress);
668         newSession.setSelectionKey(key);
669 
670         return newSession;
671     }
672 
673     /**
674      * {@inheritDoc}
675      */
676     @Override
677     public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
678         if (isDisposing()) {
679             throw new IllegalStateException("The Acceptor is being disposed.");
680         }
681 
682         if (remoteAddress == null) {
683             throw new IllegalArgumentException("remoteAddress");
684         }
685 
686         synchronized (bindLock) {
687             if (!isActive()) {
688                 throw new IllegalStateException("Can't create a session from a unbound service.");
689             }
690 
691             try {
692                 return newSessionWithoutLock(remoteAddress, localAddress);
693             } catch (RuntimeException | Error e) {
694                 throw e;
695             } catch (Exception e) {
696                 throw new RuntimeIoException("Failed to create a session.", e);
697             }
698         }
699     }
700 
701     protected DatagramChannel open(SocketAddress localAddress) throws Exception {
702         final DatagramChannel ch = DatagramChannel.open();
703         boolean success = false;
704         try {
705             new NioDatagramSessionConfig(ch).setAll(getSessionConfig());
706             ch.configureBlocking(false);
707 
708             try {
709                 ch.socket().bind(localAddress);
710             } catch (IOException ioe) {
711                 // Add some info regarding the address we try to bind to the
712                 // message
713                 String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
714                         + ioe.getMessage();
715                 Exception e = new IOException(newMessage);
716                 e.initCause(ioe.getCause());
717 
718                 // And close the channel
719                 ch.close();
720 
721                 throw e;
722             }
723 
724             ch.register(selector, SelectionKey.OP_READ);
725             success = true;
726         } finally {
727             if (!success) {
728                 close(ch);
729             }
730         }
731 
732         return ch;
733     }
734 
735     protected SocketAddress receive(DatagramChannel handle, IoBuffer buffer) throws Exception {
736         return handle.receive(buffer.buf());
737     }
738 
739     /**
740      * {@inheritDoc}
741      */
742     @Override
743     public void remove(NioSession session) {
744         getSessionRecycler().remove(session);
745         getListeners().fireSessionDestroyed(session);
746     }
747 
748     protected int select() throws Exception {
749         return selector.select();
750     }
751 
752     protected int select(long timeout) throws Exception {
753         return selector.select(timeout);
754     }
755 
756     protected Set<SelectionKey> selectedHandles() {
757         return selector.selectedKeys();
758     }
759 
760     protected int send(NioSession session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception {
761         return ((DatagramChannel) session.getChannel()).send(buffer.buf(), remoteAddress);
762     }
763 
764     @Override
765     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
766         setDefaultLocalAddress((SocketAddress) localAddress);
767     }
768 
769     protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
770         SelectionKey key = session.getSelectionKey();
771 
772         if (key == null) {
773             return;
774         }
775 
776         int newInterestOps = key.interestOps();
777 
778         if (isInterested) {
779             newInterestOps |= SelectionKey.OP_WRITE;
780         } else {
781             newInterestOps &= ~SelectionKey.OP_WRITE;
782         }
783 
784         key.interestOps(newInterestOps);
785     }
786 
787     @Override
788     public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
789         synchronized (bindLock) {
790             if (isActive()) {
791                 throw new IllegalStateException("sessionRecycler can't be set while the acceptor is bound.");
792             }
793 
794             if (sessionRecycler == null) {
795                 sessionRecycler = DEFAULT_RECYCLER;
796             }
797 
798             this.sessionRecycler = sessionRecycler;
799         }
800     }
801 
802     /**
803      * {@inheritDoc}
804      */
805     @Override
806     protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
807         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
808 
809         cancelQueue.add(request);
810         startupAcceptor();
811         wakeup();
812 
813         request.awaitUninterruptibly();
814 
815         if (request.getException() != null) {
816             throw request.getException();
817         }
818     }
819 
820     /**
821      * {@inheritDoc}
822      */
823     @Override
824     public void updateTrafficControl(NioSession session) {
825         // Nothing to do
826     }
827 
828     protected void wakeup() {
829         selector.wakeup();
830     }
831 
832     /**
833      * {@inheritDoc}
834      */
835     @Override
836     public void write(NioSession session, WriteRequest writeRequest) {
837         // We will try to write the message directly
838         long currentTime = System.currentTimeMillis();
839         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
840         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
841                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
842 
843         int writtenBytes = 0;
844 
845         // Deal with the special case of a Message marker (no bytes in the request)
846         // We just have to return after having calle dthe messageSent event
847         IoBuffer buf = (IoBuffer) writeRequest.getMessage();
848 
849         if (buf.remaining() == 0) {
850             // Clear and fire event
851             session.setCurrentWriteRequest(null);
852             buf.reset();
853             session.getFilterChain().fireMessageSent(writeRequest);
854             return;
855         }
856 
857         // Now, write the data
858         try {
859             for (;;) {
860                 if (writeRequest == null) {
861                     writeRequest = writeRequestQueue.poll(session);
862 
863                     if (writeRequest == null) {
864                         setInterestedInWrite(session, false);
865                         break;
866                     }
867 
868                     session.setCurrentWriteRequest(writeRequest);
869                 }
870 
871                 buf = (IoBuffer) writeRequest.getMessage();
872 
873                 if (buf.remaining() == 0) {
874                     // Clear and fire event
875                     session.setCurrentWriteRequest(null);
876                     buf.reset();
877                     session.getFilterChain().fireMessageSent(writeRequest);
878                     continue;
879                 }
880 
881                 SocketAddress destination = writeRequest.getDestination();
882 
883                 if (destination == null) {
884                     destination = session.getRemoteAddress();
885                 }
886 
887                 int localWrittenBytes = send(session, buf, destination);
888 
889                 if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
890                     // Kernel buffer is full or wrote too much
891                     setInterestedInWrite(session, true);
892 
893                     session.getWriteRequestQueue().offer(session, writeRequest);
894                     scheduleFlush(session);
895                 } else {
896                     setInterestedInWrite(session, false);
897 
898                     // Clear and fire event
899                     session.setCurrentWriteRequest(null);
900                     writtenBytes += localWrittenBytes;
901                     buf.reset();
902                     session.getFilterChain().fireMessageSent(writeRequest);
903 
904                     break;
905                 }
906             }
907         } catch (Exception e) {
908             session.getFilterChain().fireExceptionCaught(e);
909         } finally {
910             session.increaseWrittenBytes(writtenBytes, currentTime);
911         }
912     }
913 }