View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.net.Inet4Address;
23  import java.net.Inet6Address;
24  import java.net.InetAddress;
25  import java.net.InetSocketAddress;
26  import java.net.SocketAddress;
27  import java.nio.channels.ClosedSelectorException;
28  import java.nio.channels.DatagramChannel;
29  import java.nio.channels.SelectionKey;
30  import java.nio.channels.Selector;
31  import java.util.Collections;
32  import java.util.HashMap;
33  import java.util.HashSet;
34  import java.util.Iterator;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.Queue;
38  import java.util.Set;
39  import java.util.concurrent.ConcurrentLinkedQueue;
40  import java.util.concurrent.Executor;
41  import java.util.concurrent.Semaphore;
42  
43  import org.apache.mina.core.RuntimeIoException;
44  import org.apache.mina.core.buffer.IoBuffer;
45  import org.apache.mina.core.service.AbstractIoAcceptor;
46  import org.apache.mina.core.service.IoAcceptor;
47  import org.apache.mina.core.service.IoProcessor;
48  import org.apache.mina.core.service.TransportMetadata;
49  import org.apache.mina.core.session.AbstractIoSession;
50  import org.apache.mina.core.session.ExpiringSessionRecycler;
51  import org.apache.mina.core.session.IoSession;
52  import org.apache.mina.core.session.IoSessionConfig;
53  import org.apache.mina.core.session.IoSessionRecycler;
54  import org.apache.mina.core.write.WriteRequest;
55  import org.apache.mina.core.write.WriteRequestQueue;
56  import org.apache.mina.transport.socket.DatagramAcceptor;
57  import org.apache.mina.transport.socket.DatagramSessionConfig;
58  import org.apache.mina.transport.socket.DefaultDatagramSessionConfig;
59  import org.apache.mina.util.ExceptionMonitor;
60  
61  /**
62   * {@link IoAcceptor} for datagram transport (UDP/IP).
63   *
64   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
65   * @org.apache.xbean.XBean
66   */
67  public final class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor, IoProcessor<NioSession> {
68      /** 
69       * A session recycler that is used to retrieve an existing session, unless it's too old.
70       **/  
71      private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
72  
73      /**
74       * A timeout used for the select, as we need to get out to deal with idle
75       * sessions
76       */
77      private static final long SELECT_TIMEOUT = 1000L;
78  
79      /** A lock used to protect the selector to be waked up before it's created */
80      private final Semaphore lock = new Semaphore(1);
81  
82      /** A queue used to store the list of pending Binds */
83      private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
84  
85      private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
86  
87      private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<NioSession>();
88  
89      private final Map<SocketAddress, DatagramChannel> boundHandles = Collections
90              .synchronizedMap(new HashMap<SocketAddress, DatagramChannel>());
91  
92      private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
93  
94      private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
95  
96      private volatile boolean selectable;
97  
98      /** The thread responsible of accepting incoming requests */
99      private Acceptor acceptor;
100 
101     private long lastIdleCheckTime;
102 
103     /** The Selector used by this acceptor */
104     private volatile Selector selector;
105 
106     /**
107      * Creates a new instance.
108      */
109     public NioDatagramAcceptor() {
110         this(new DefaultDatagramSessionConfig(), null);
111     }
112 
113     /**
114      * Creates a new instance.
115      */
116     public NioDatagramAcceptor(Executor executor) {
117         this(new DefaultDatagramSessionConfig(), executor);
118     }
119 
120     /**
121      * Creates a new instance.
122      */
123     private NioDatagramAcceptor(IoSessionConfig sessionConfig, Executor executor) {
124         super(sessionConfig, executor);
125 
126         try {
127             init();
128             selectable = true;
129         } catch (RuntimeException e) {
130             throw e;
131         } catch (Exception e) {
132             throw new RuntimeIoException("Failed to initialize.", e);
133         } finally {
134             if (!selectable) {
135                 try {
136                     destroy();
137                 } catch (Exception e) {
138                     ExceptionMonitor.getInstance().exceptionCaught(e);
139                 }
140             }
141         }
142     }
143 
144     /**
145      * This private class is used to accept incoming connection from
146      * clients. It's an infinite loop, which can be stopped when all
147      * the registered handles have been removed (unbound).
148      */
149     private class Acceptor implements Runnable {
150         public void run() {
151             int nHandles = 0;
152             lastIdleCheckTime = System.currentTimeMillis();
153 
154             // Release the lock
155             lock.release();
156 
157             while (selectable) {
158                 try {
159                     int selected = select(SELECT_TIMEOUT);
160 
161                     nHandles += registerHandles();
162 
163                     if (nHandles == 0) {
164                         try {
165                             lock.acquire();
166 
167                             if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
168                                 acceptor = null;
169                                 break;
170                             }
171                         } finally {
172                             lock.release();
173                         }
174                     }
175 
176                     if (selected > 0) {
177                         processReadySessions(selectedHandles());
178                     }
179 
180                     long currentTime = System.currentTimeMillis();
181                     flushSessions(currentTime);
182                     nHandles -= unregisterHandles();
183 
184                     notifyIdleSessions(currentTime);
185                 } catch (ClosedSelectorException cse) {
186                     // If the selector has been closed, we can exit the loop
187                     break;
188                 } catch (Exception e) {
189                     ExceptionMonitor.getInstance().exceptionCaught(e);
190 
191                     try {
192                         Thread.sleep(1000);
193                     } catch (InterruptedException e1) {
194                     }
195                 }
196             }
197 
198             if (selectable && isDisposing()) {
199                 selectable = false;
200                 try {
201                     destroy();
202                 } catch (Exception e) {
203                     ExceptionMonitor.getInstance().exceptionCaught(e);
204                 } finally {
205                     disposalFuture.setValue(true);
206                 }
207             }
208         }
209     }
210 
211     private int registerHandles() {
212         for (;;) {
213             AcceptorOperationFuture req = registerQueue.poll();
214 
215             if (req == null) {
216                 break;
217             }
218 
219             Map<SocketAddress, DatagramChannel> newHandles = new HashMap<SocketAddress, DatagramChannel>();
220             List<SocketAddress> localAddresses = req.getLocalAddresses();
221 
222             try {
223                 for (SocketAddress socketAddress : localAddresses) {
224                     DatagramChannel handle = open(socketAddress);
225                     newHandles.put(localAddress(handle), handle);
226                 }
227 
228                 boundHandles.putAll(newHandles);
229 
230                 getListeners().fireServiceActivated();
231                 req.setDone();
232 
233                 return newHandles.size();
234             } catch (Exception e) {
235                 req.setException(e);
236             } finally {
237                 // Roll back if failed to bind all addresses.
238                 if (req.getException() != null) {
239                     for (DatagramChannel handle : newHandles.values()) {
240                         try {
241                             close(handle);
242                         } catch (Exception e) {
243                             ExceptionMonitor.getInstance().exceptionCaught(e);
244                         }
245                     }
246 
247                     wakeup();
248                 }
249             }
250         }
251 
252         return 0;
253     }
254 
255     private void processReadySessions(Set<SelectionKey> handles) {
256         Iterator<SelectionKey> iterator = handles.iterator();
257 
258         while (iterator.hasNext()) {
259             SelectionKey key = iterator.next();
260             DatagramChannel handle = (DatagramChannel) key.channel();
261             iterator.remove();
262 
263             try {
264                 if ((key != null) && key.isValid() && key.isReadable()) {
265                     readHandle(handle);
266                 }
267 
268                 if ((key != null) && key.isValid() && key.isWritable()) {
269                     for (IoSession session : getManagedSessions().values()) {
270                         scheduleFlush((NioSession) session);
271                     }
272                 }
273             } catch (Throwable t) {
274                 ExceptionMonitor.getInstance().exceptionCaught(t);
275             }
276         }
277     }
278 
279     private boolean scheduleFlush(NioSession session) {
280         // Set the schedule for flush flag if the session
281         // has not already be added to the flushingSessions
282         // queue
283         if (session.setScheduledForFlush(true)) {
284             flushingSessions.add(session);
285             return true;
286         } else {
287             return false;
288         }
289     }
290 
291     private void readHandle(DatagramChannel handle) throws Exception {
292         IoBuffer readBuf = IoBuffer.allocate(getSessionConfig().getReadBufferSize());
293 
294         SocketAddress remoteAddress = receive(handle, readBuf);
295 
296         if (remoteAddress != null) {
297             IoSession session = newSessionWithoutLock(remoteAddress, localAddress(handle));
298 
299             readBuf.flip();
300 
301             session.getFilterChain().fireMessageReceived(readBuf);
302         }
303     }
304 
305     private IoSession newSessionWithoutLock(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
306         DatagramChannel handle = boundHandles.get(localAddress);
307 
308         if (handle == null) {
309             throw new IllegalArgumentException("Unknown local address: " + localAddress);
310         }
311 
312         IoSession session;
313 
314         synchronized (sessionRecycler) {
315             session = sessionRecycler.recycle(remoteAddress);
316 
317             if (session != null) {
318                 return session;
319             }
320 
321             // If a new session needs to be created.
322             NioSession newSession = newSession(this, handle, remoteAddress);
323             getSessionRecycler().put(newSession);
324             session = newSession;
325         }
326 
327         initSession(session, null, null);
328 
329         try {
330             this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
331             getListeners().fireSessionCreated(session);
332         } catch (Throwable t) {
333             ExceptionMonitor.getInstance().exceptionCaught(t);
334         }
335 
336         return session;
337     }
338 
339     private void flushSessions(long currentTime) {
340         for (;;) {
341             NioSession session = flushingSessions.poll();
342 
343             if (session == null) {
344                 break;
345             }
346 
347             // Reset the Schedule for flush flag for this session,
348             // as we are flushing it now
349             session.unscheduledForFlush();
350 
351             try {
352                 boolean flushedAll = flush(session, currentTime);
353 
354                 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) {
355                     scheduleFlush(session);
356                 }
357             } catch (Exception e) {
358                 session.getFilterChain().fireExceptionCaught(e);
359             }
360         }
361     }
362 
363     private boolean flush(NioSession session, long currentTime) throws Exception {
364         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
365         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
366                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
367 
368         int writtenBytes = 0;
369 
370         try {
371             for (;;) {
372                 WriteRequest req = session.getCurrentWriteRequest();
373 
374                 if (req == null) {
375                     req = writeRequestQueue.poll(session);
376 
377                     if (req == null) {
378                         setInterestedInWrite(session, false);
379                         break;
380                     }
381 
382                     session.setCurrentWriteRequest(req);
383                 }
384 
385                 IoBuffer buf = (IoBuffer) req.getMessage();
386 
387                 if (buf.remaining() == 0) {
388                     // Clear and fire event
389                     session.setCurrentWriteRequest(null);
390                     buf.reset();
391                     session.getFilterChain().fireMessageSent(req);
392                     continue;
393                 }
394 
395                 SocketAddress destination = req.getDestination();
396 
397                 if (destination == null) {
398                     destination = session.getRemoteAddress();
399                 }
400 
401                 int localWrittenBytes = send(session, buf, destination);
402 
403                 if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
404                     // Kernel buffer is full or wrote too much
405                     setInterestedInWrite(session, true);
406 
407                     return false;
408                 } else {
409                     setInterestedInWrite(session, false);
410 
411                     // Clear and fire event
412                     session.setCurrentWriteRequest(null);
413                     writtenBytes += localWrittenBytes;
414                     buf.reset();
415                     session.getFilterChain().fireMessageSent(req);
416                 }
417             }
418         } finally {
419             session.increaseWrittenBytes(writtenBytes, currentTime);
420         }
421 
422         return true;
423     }
424 
425     private int unregisterHandles() {
426         int nHandles = 0;
427 
428         for (;;) {
429             AcceptorOperationFuture request = cancelQueue.poll();
430             if (request == null) {
431                 break;
432             }
433 
434             // close the channels
435             for (SocketAddress socketAddress : request.getLocalAddresses()) {
436                 DatagramChannel handle = boundHandles.remove(socketAddress);
437 
438                 if (handle == null) {
439                     continue;
440                 }
441 
442                 try {
443                     close(handle);
444                     wakeup(); // wake up again to trigger thread death
445                 } catch (Throwable e) {
446                     ExceptionMonitor.getInstance().exceptionCaught(e);
447                 } finally {
448                     nHandles++;
449                 }
450             }
451 
452             request.setDone();
453         }
454 
455         return nHandles;
456     }
457 
458     private void notifyIdleSessions(long currentTime) {
459         // process idle sessions
460         if (currentTime - lastIdleCheckTime >= 1000) {
461             lastIdleCheckTime = currentTime;
462             AbstractIoSession.notifyIdleness(getListeners().getManagedSessions().values().iterator(), currentTime);
463         }
464     }
465 
466     /**
467      * Starts the inner Acceptor thread.
468      */
469     private void startupAcceptor() throws InterruptedException {
470         if (!selectable) {
471             registerQueue.clear();
472             cancelQueue.clear();
473             flushingSessions.clear();
474         }
475 
476         lock.acquire();
477 
478         if (acceptor == null) {
479             acceptor = new Acceptor();
480             executeWorker(acceptor);
481         } else {
482             lock.release();
483         }
484     }
485 
486     protected void init() throws Exception {
487         this.selector = Selector.open();
488     }
489 
490     /**
491      * {@inheritDoc}
492      */
493     public void add(NioSession session) {
494         // Nothing to do for UDP
495     }
496 
497     /**
498      * {@inheritDoc}
499      */
500     @Override
501     protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
502         // Create a bind request as a Future operation. When the selector
503         // have handled the registration, it will signal this future.
504         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
505 
506         // adds the Registration request to the queue for the Workers
507         // to handle
508         registerQueue.add(request);
509 
510         // creates the Acceptor instance and has the local
511         // executor kick it off.
512         startupAcceptor();
513 
514         // As we just started the acceptor, we have to unblock the select()
515         // in order to process the bind request we just have added to the
516         // registerQueue.
517         try {
518             lock.acquire();
519 
520             // Wait a bit to give a chance to the Acceptor thread to do the select()
521             Thread.sleep(10);
522             wakeup();
523         } finally {
524             lock.release();
525         }
526 
527         // Now, we wait until this request is completed.
528         request.awaitUninterruptibly();
529 
530         if (request.getException() != null) {
531             throw request.getException();
532         }
533 
534         // Update the local addresses.
535         // setLocalAddresses() shouldn't be called from the worker thread
536         // because of deadlock.
537         Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
538 
539         for (DatagramChannel handle : boundHandles.values()) {
540             newLocalAddresses.add(localAddress(handle));
541         }
542 
543         return newLocalAddresses;
544     }
545 
546     protected void close(DatagramChannel handle) throws Exception {
547         SelectionKey key = handle.keyFor(selector);
548 
549         if (key != null) {
550             key.cancel();
551         }
552 
553         handle.disconnect();
554         handle.close();
555     }
556 
557     protected void destroy() throws Exception {
558         if (selector != null) {
559             selector.close();
560         }
561     }
562 
563     /**
564      * {@inheritDoc}
565      */
566     @Override
567     protected void dispose0() throws Exception {
568         unbind();
569         startupAcceptor();
570         wakeup();
571     }
572 
573     /**
574      * {@inheritDoc}
575      */
576     public void flush(NioSession session) {
577         if (scheduleFlush(session)) {
578             wakeup();
579         }
580     }
581 
582     @Override
583     public InetSocketAddress getDefaultLocalAddress() {
584         return (InetSocketAddress) super.getDefaultLocalAddress();
585     }
586 
587     @Override
588     public InetSocketAddress getLocalAddress() {
589         return (InetSocketAddress) super.getLocalAddress();
590     }
591 
592     /**
593      * {@inheritDoc}
594      */
595     public DatagramSessionConfig getSessionConfig() {
596         return (DatagramSessionConfig) sessionConfig;
597     }
598 
599     public final IoSessionRecycler getSessionRecycler() {
600         return sessionRecycler;
601     }
602 
603     public TransportMetadata getTransportMetadata() {
604         return NioDatagramSession.METADATA;
605     }
606 
607     protected boolean isReadable(DatagramChannel handle) {
608         SelectionKey key = handle.keyFor(selector);
609 
610         if ((key == null) || (!key.isValid())) {
611             return false;
612         }
613 
614         return key.isReadable();
615     }
616 
617     protected boolean isWritable(DatagramChannel handle) {
618         SelectionKey key = handle.keyFor(selector);
619 
620         if ((key == null) || (!key.isValid())) {
621             return false;
622         }
623 
624         return key.isWritable();
625     }
626 
627     protected SocketAddress localAddress(DatagramChannel handle) throws Exception {
628         InetSocketAddress inetSocketAddress = (InetSocketAddress) handle.socket().getLocalSocketAddress();
629         InetAddress inetAddress = inetSocketAddress.getAddress();
630 
631         if ((inetAddress instanceof Inet6Address) && (((Inet6Address) inetAddress).isIPv4CompatibleAddress())) {
632             // Ugly hack to workaround a problem on linux : the ANY address is always converted to IPV6
633             // even if the original address was an IPV4 address. We do store the two IPV4 and IPV6
634             // ANY address in the map.
635             byte[] ipV6Address = ((Inet6Address) inetAddress).getAddress();
636             byte[] ipV4Address = new byte[4];
637 
638             for (int i = 0; i < 4; i++) {
639                 ipV4Address[i] = ipV6Address[12 + i];
640             }
641 
642             InetAddress inet4Adress = Inet4Address.getByAddress(ipV4Address);
643             return new InetSocketAddress(inet4Adress, inetSocketAddress.getPort());
644         } else {
645             return inetSocketAddress;
646         }
647     }
648 
649     protected NioSession newSession(IoProcessor<NioSession> processor, DatagramChannel handle,
650             SocketAddress remoteAddress) {
651         SelectionKey key = handle.keyFor(selector);
652 
653         if ((key == null) || (!key.isValid())) {
654             return null;
655         }
656 
657         NioDatagramSession newSession = new NioDatagramSession(this, handle, processor, remoteAddress);
658         newSession.setSelectionKey(key);
659 
660         return newSession;
661     }
662 
663     /**
664      * {@inheritDoc}
665      */
666     public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
667         if (isDisposing()) {
668             throw new IllegalStateException("Already disposed.");
669         }
670 
671         if (remoteAddress == null) {
672             throw new IllegalArgumentException("remoteAddress");
673         }
674 
675         synchronized (bindLock) {
676             if (!isActive()) {
677                 throw new IllegalStateException("Can't create a session from a unbound service.");
678             }
679 
680             try {
681                 return newSessionWithoutLock(remoteAddress, localAddress);
682             } catch (RuntimeException e) {
683                 throw e;
684             } catch (Error e) {
685                 throw e;
686             } catch (Exception e) {
687                 throw new RuntimeIoException("Failed to create a session.", e);
688             }
689         }
690     }
691 
692     protected DatagramChannel open(SocketAddress localAddress) throws Exception {
693         final DatagramChannel c = DatagramChannel.open();
694         boolean success = false;
695         try {
696             new NioDatagramSessionConfig(c).setAll(getSessionConfig());
697             c.configureBlocking(false);
698             c.socket().bind(localAddress);
699             c.register(selector, SelectionKey.OP_READ);
700             success = true;
701         } finally {
702             if (!success) {
703                 close(c);
704             }
705         }
706 
707         return c;
708     }
709 
710     protected SocketAddress receive(DatagramChannel handle, IoBuffer buffer) throws Exception {
711         return handle.receive(buffer.buf());
712     }
713 
714     /**
715      * {@inheritDoc}
716      */
717     public void remove(NioSession session) {
718         getSessionRecycler().remove(session);
719         getListeners().fireSessionDestroyed(session);
720     }
721 
722     protected int select() throws Exception {
723         return selector.select();
724     }
725 
726     protected int select(long timeout) throws Exception {
727         return selector.select(timeout);
728     }
729 
730     protected Set<SelectionKey> selectedHandles() {
731         return selector.selectedKeys();
732     }
733 
734     protected int send(NioSession session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception {
735         return ((DatagramChannel) session.getChannel()).send(buffer.buf(), remoteAddress);
736     }
737 
738     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
739         setDefaultLocalAddress((SocketAddress) localAddress);
740     }
741 
742     protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
743         SelectionKey key = session.getSelectionKey();
744 
745         if (key == null) {
746             return;
747         }
748 
749         int newInterestOps = key.interestOps();
750 
751         if (isInterested) {
752             newInterestOps |= SelectionKey.OP_WRITE;
753             //newInterestOps &= ~SelectionKey.OP_READ;
754         } else {
755             newInterestOps &= ~SelectionKey.OP_WRITE;
756             //newInterestOps |= SelectionKey.OP_READ;
757         }
758 
759         key.interestOps(newInterestOps);
760     }
761 
762     public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
763         synchronized (bindLock) {
764             if (isActive()) {
765                 throw new IllegalStateException("sessionRecycler can't be set while the acceptor is bound.");
766             }
767 
768             if (sessionRecycler == null) {
769                 sessionRecycler = DEFAULT_RECYCLER;
770             }
771 
772             this.sessionRecycler = sessionRecycler;
773         }
774     }
775 
776     /**
777      * {@inheritDoc}
778      */
779     @Override
780     protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
781         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
782 
783         cancelQueue.add(request);
784         startupAcceptor();
785         wakeup();
786 
787         request.awaitUninterruptibly();
788 
789         if (request.getException() != null) {
790             throw request.getException();
791         }
792     }
793 
794     /**
795      * {@inheritDoc}
796      */
797     public void updateTrafficControl(NioSession session) {
798         throw new UnsupportedOperationException();
799     }
800 
801     protected void wakeup() {
802         selector.wakeup();
803     }
804 
805     /**
806      * {@inheritDoc}
807      */
808     public void write(NioSession session, WriteRequest writeRequest) {
809         // We will try to write the message directly
810         long currentTime = System.currentTimeMillis();
811         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
812         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
813                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
814 
815         int writtenBytes = 0;
816 
817         // Deal with the special case of a Message marker (no bytes in the request)
818         // We just have to return after having calle dthe messageSent event
819         IoBuffer buf = (IoBuffer) writeRequest.getMessage();
820 
821         if (buf.remaining() == 0) {
822             // Clear and fire event
823             session.setCurrentWriteRequest(null);
824             buf.reset();
825             session.getFilterChain().fireMessageSent(writeRequest);
826             return;
827         }
828 
829         // Now, write the data
830         try {
831             for (;;) {
832                 if (writeRequest == null) {
833                     writeRequest = writeRequestQueue.poll(session);
834 
835                     if (writeRequest == null) {
836                         setInterestedInWrite(session, false);
837                         break;
838                     }
839 
840                     session.setCurrentWriteRequest(writeRequest);
841                 }
842 
843                 buf = (IoBuffer) writeRequest.getMessage();
844 
845                 if (buf.remaining() == 0) {
846                     // Clear and fire event
847                     session.setCurrentWriteRequest(null);
848                     buf.reset();
849                     session.getFilterChain().fireMessageSent(writeRequest);
850                     continue;
851                 }
852 
853                 SocketAddress destination = writeRequest.getDestination();
854 
855                 if (destination == null) {
856                     destination = session.getRemoteAddress();
857                 }
858 
859                 int localWrittenBytes = send(session, buf, destination);
860 
861                 if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
862                     // Kernel buffer is full or wrote too much
863                     setInterestedInWrite(session, true);
864 
865                     session.getWriteRequestQueue().offer(session, writeRequest);
866                     scheduleFlush(session);
867                 } else {
868                     setInterestedInWrite(session, false);
869 
870                     // Clear and fire event
871                     session.setCurrentWriteRequest(null);
872                     writtenBytes += localWrittenBytes;
873                     buf.reset();
874                     session.getFilterChain().fireMessageSent(writeRequest);
875 
876                     break;
877                 }
878             }
879         } catch (Exception e) {
880             session.getFilterChain().fireExceptionCaught(e);
881         } finally {
882             session.increaseWrittenBytes(writtenBytes, currentTime);
883         }
884     }
885 }