View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.net.Inet4Address;
24  import java.net.Inet6Address;
25  import java.net.InetAddress;
26  import java.net.InetSocketAddress;
27  import java.net.SocketAddress;
28  import java.nio.channels.ClosedSelectorException;
29  import java.nio.channels.DatagramChannel;
30  import java.nio.channels.SelectionKey;
31  import java.nio.channels.Selector;
32  import java.util.Collections;
33  import java.util.HashMap;
34  import java.util.HashSet;
35  import java.util.Iterator;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Queue;
39  import java.util.Set;
40  import java.util.concurrent.ConcurrentLinkedQueue;
41  import java.util.concurrent.Executor;
42  import java.util.concurrent.Semaphore;
43  
44  import org.apache.mina.core.RuntimeIoException;
45  import org.apache.mina.core.buffer.IoBuffer;
46  import org.apache.mina.core.service.AbstractIoAcceptor;
47  import org.apache.mina.core.service.IoAcceptor;
48  import org.apache.mina.core.service.IoProcessor;
49  import org.apache.mina.core.service.TransportMetadata;
50  import org.apache.mina.core.session.AbstractIoSession;
51  import org.apache.mina.core.session.ExpiringSessionRecycler;
52  import org.apache.mina.core.session.IoSession;
53  import org.apache.mina.core.session.IoSessionConfig;
54  import org.apache.mina.core.session.IoSessionRecycler;
55  import org.apache.mina.core.write.WriteRequest;
56  import org.apache.mina.core.write.WriteRequestQueue;
57  import org.apache.mina.transport.socket.DatagramAcceptor;
58  import org.apache.mina.transport.socket.DatagramSessionConfig;
59  import org.apache.mina.transport.socket.DefaultDatagramSessionConfig;
60  import org.apache.mina.util.ExceptionMonitor;
61  
62  /**
63   * {@link IoAcceptor} for datagram transport (UDP/IP).
64   *
65   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
66   * @org.apache.xbean.XBean
67   */
68  public final class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor, IoProcessor<NioSession> {
69      /**
70       * A session recycler that is used to retrieve an existing session, unless it's too old.
71       **/
72      private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
73  
74      /**
75       * A timeout used for the select, as we need to get out to deal with idle
76       * sessions
77       */
78      private static final long SELECT_TIMEOUT = 1000L;
79  
80      /** A lock used to protect the selector to be waked up before it's created */
81      private final Semaphore lock = new Semaphore(1);
82  
83      /** A queue used to store the list of pending Binds */
84      private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
85  
86      private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
87  
88      private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<NioSession>();
89  
90      private final Map<SocketAddress, DatagramChannel> boundHandles = Collections
91              .synchronizedMap(new HashMap<SocketAddress, DatagramChannel>());
92  
93      private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
94  
95      private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
96  
97      private volatile boolean selectable;
98  
99      /** The thread responsible of accepting incoming requests */
100     private Acceptor acceptor;
101 
102     private long lastIdleCheckTime;
103 
104     /** The Selector used by this acceptor */
105     private volatile Selector selector;
106 
107     /**
108      * Creates a new instance.
109      */
110     public NioDatagramAcceptor() {
111         this(new DefaultDatagramSessionConfig(), null);
112     }
113 
114     /**
115      * Creates a new instance.
116      */
117     public NioDatagramAcceptor(Executor executor) {
118         this(new DefaultDatagramSessionConfig(), executor);
119     }
120 
121     /**
122      * Creates a new instance.
123      */
124     private NioDatagramAcceptor(IoSessionConfig sessionConfig, Executor executor) {
125         super(sessionConfig, executor);
126 
127         try {
128             init();
129             selectable = true;
130         } catch (RuntimeException e) {
131             throw e;
132         } catch (Exception e) {
133             throw new RuntimeIoException("Failed to initialize.", e);
134         } finally {
135             if (!selectable) {
136                 try {
137                     destroy();
138                 } catch (Exception e) {
139                     ExceptionMonitor.getInstance().exceptionCaught(e);
140                 }
141             }
142         }
143     }
144 
145     /**
146      * This private class is used to accept incoming connection from
147      * clients. It's an infinite loop, which can be stopped when all
148      * the registered handles have been removed (unbound).
149      */
150     private class Acceptor implements Runnable {
151         public void run() {
152             int nHandles = 0;
153             lastIdleCheckTime = System.currentTimeMillis();
154 
155             // Release the lock
156             lock.release();
157 
158             while (selectable) {
159                 try {
160                     int selected = select(SELECT_TIMEOUT);
161 
162                     nHandles += registerHandles();
163 
164                     if (nHandles == 0) {
165                         try {
166                             lock.acquire();
167 
168                             if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
169                                 acceptor = null;
170                                 break;
171                             }
172                         } finally {
173                             lock.release();
174                         }
175                     }
176 
177                     if (selected > 0) {
178                         processReadySessions(selectedHandles());
179                     }
180 
181                     long currentTime = System.currentTimeMillis();
182                     flushSessions(currentTime);
183                     nHandles -= unregisterHandles();
184 
185                     notifyIdleSessions(currentTime);
186                 } catch (ClosedSelectorException cse) {
187                     // If the selector has been closed, we can exit the loop
188                     ExceptionMonitor.getInstance().exceptionCaught(cse);
189                     break;
190                 } catch (Exception e) {
191                     ExceptionMonitor.getInstance().exceptionCaught(e);
192 
193                     try {
194                         Thread.sleep(1000);
195                     } catch (InterruptedException e1) {
196                     }
197                 }
198             }
199 
200             if (selectable && isDisposing()) {
201                 selectable = false;
202                 try {
203                     destroy();
204                 } catch (Exception e) {
205                     ExceptionMonitor.getInstance().exceptionCaught(e);
206                 } finally {
207                     disposalFuture.setValue(true);
208                 }
209             }
210         }
211     }
212 
213     private int registerHandles() {
214         for (;;) {
215             AcceptorOperationFuture req = registerQueue.poll();
216 
217             if (req == null) {
218                 break;
219             }
220 
221             Map<SocketAddress, DatagramChannel> newHandles = new HashMap<SocketAddress, DatagramChannel>();
222             List<SocketAddress> localAddresses = req.getLocalAddresses();
223 
224             try {
225                 for (SocketAddress socketAddress : localAddresses) {
226                     DatagramChannel handle = open(socketAddress);
227                     newHandles.put(localAddress(handle), handle);
228                 }
229 
230                 boundHandles.putAll(newHandles);
231 
232                 getListeners().fireServiceActivated();
233                 req.setDone();
234 
235                 return newHandles.size();
236             } catch (Exception e) {
237                 req.setException(e);
238             } finally {
239                 // Roll back if failed to bind all addresses.
240                 if (req.getException() != null) {
241                     for (DatagramChannel handle : newHandles.values()) {
242                         try {
243                             close(handle);
244                         } catch (Exception e) {
245                             ExceptionMonitor.getInstance().exceptionCaught(e);
246                         }
247                     }
248 
249                     wakeup();
250                 }
251             }
252         }
253 
254         return 0;
255     }
256 
257     private void processReadySessions(Set<SelectionKey> handles) {
258         Iterator<SelectionKey> iterator = handles.iterator();
259 
260         while (iterator.hasNext()) {
261             SelectionKey key = iterator.next();
262             DatagramChannel handle = (DatagramChannel) key.channel();
263             iterator.remove();
264 
265             try {
266                 if ((key != null) && key.isValid() && key.isReadable()) {
267                     readHandle(handle);
268                 }
269 
270                 if ((key != null) && key.isValid() && key.isWritable()) {
271                     for (IoSession session : getManagedSessions().values()) {
272                         scheduleFlush((NioSession) session);
273                     }
274                 }
275             } catch (Exception e) {
276                 ExceptionMonitor.getInstance().exceptionCaught(e);
277             }
278         }
279     }
280 
281     private boolean scheduleFlush(NioSession session) {
282         // Set the schedule for flush flag if the session
283         // has not already be added to the flushingSessions
284         // queue
285         if (session.setScheduledForFlush(true)) {
286             flushingSessions.add(session);
287             return true;
288         } else {
289             return false;
290         }
291     }
292 
293     private void readHandle(DatagramChannel handle) throws Exception {
294         IoBuffer readBuf = IoBuffer.allocate(getSessionConfig().getReadBufferSize());
295 
296         SocketAddress remoteAddress = receive(handle, readBuf);
297 
298         if (remoteAddress != null) {
299             IoSession session = newSessionWithoutLock(remoteAddress, localAddress(handle));
300 
301             readBuf.flip();
302 
303             session.getFilterChain().fireMessageReceived(readBuf);
304         }
305     }
306 
307     private IoSession newSessionWithoutLock(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
308         DatagramChannel handle = boundHandles.get(localAddress);
309 
310         if (handle == null) {
311             throw new IllegalArgumentException("Unknown local address: " + localAddress);
312         }
313 
314         IoSession session;
315 
316         synchronized (sessionRecycler) {
317             session = sessionRecycler.recycle(remoteAddress);
318 
319             if (session != null) {
320                 return session;
321             }
322 
323             // If a new session needs to be created.
324             NioSession newSession = newSession(this, handle, remoteAddress);
325             getSessionRecycler().put(newSession);
326             session = newSession;
327         }
328 
329         initSession(session, null, null);
330 
331         try {
332             this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
333             getListeners().fireSessionCreated(session);
334         } catch (Exception e) {
335             ExceptionMonitor.getInstance().exceptionCaught(e);
336         }
337 
338         return session;
339     }
340 
341     private void flushSessions(long currentTime) {
342         for (;;) {
343             NioSession session = flushingSessions.poll();
344 
345             if (session == null) {
346                 break;
347             }
348 
349             // Reset the Schedule for flush flag for this session,
350             // as we are flushing it now
351             session.unscheduledForFlush();
352 
353             try {
354                 boolean flushedAll = flush(session, currentTime);
355 
356                 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) {
357                     scheduleFlush(session);
358                 }
359             } catch (Exception e) {
360                 session.getFilterChain().fireExceptionCaught(e);
361             }
362         }
363     }
364 
365     private boolean flush(NioSession session, long currentTime) throws Exception {
366         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
367         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
368                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
369 
370         int writtenBytes = 0;
371 
372         try {
373             for (;;) {
374                 WriteRequest req = session.getCurrentWriteRequest();
375 
376                 if (req == null) {
377                     req = writeRequestQueue.poll(session);
378 
379                     if (req == null) {
380                         setInterestedInWrite(session, false);
381                         break;
382                     }
383 
384                     session.setCurrentWriteRequest(req);
385                 }
386 
387                 IoBuffer buf = (IoBuffer) req.getMessage();
388 
389                 if (buf.remaining() == 0) {
390                     // Clear and fire event
391                     session.setCurrentWriteRequest(null);
392                     buf.reset();
393                     session.getFilterChain().fireMessageSent(req);
394                     continue;
395                 }
396 
397                 SocketAddress destination = req.getDestination();
398 
399                 if (destination == null) {
400                     destination = session.getRemoteAddress();
401                 }
402 
403                 int localWrittenBytes = send(session, buf, destination);
404 
405                 if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
406                     // Kernel buffer is full or wrote too much
407                     setInterestedInWrite(session, true);
408 
409                     return false;
410                 } else {
411                     setInterestedInWrite(session, false);
412 
413                     // Clear and fire event
414                     session.setCurrentWriteRequest(null);
415                     writtenBytes += localWrittenBytes;
416                     buf.reset();
417                     session.getFilterChain().fireMessageSent(req);
418                 }
419             }
420         } finally {
421             session.increaseWrittenBytes(writtenBytes, currentTime);
422         }
423 
424         return true;
425     }
426 
427     private int unregisterHandles() {
428         int nHandles = 0;
429 
430         for (;;) {
431             AcceptorOperationFuture request = cancelQueue.poll();
432             if (request == null) {
433                 break;
434             }
435 
436             // close the channels
437             for (SocketAddress socketAddress : request.getLocalAddresses()) {
438                 DatagramChannel handle = boundHandles.remove(socketAddress);
439 
440                 if (handle == null) {
441                     continue;
442                 }
443 
444                 try {
445                     close(handle);
446                     wakeup(); // wake up again to trigger thread death
447                 } catch (Exception e) {
448                     ExceptionMonitor.getInstance().exceptionCaught(e);
449                 } finally {
450                     nHandles++;
451                 }
452             }
453 
454             request.setDone();
455         }
456 
457         return nHandles;
458     }
459 
460     private void notifyIdleSessions(long currentTime) {
461         // process idle sessions
462         if (currentTime - lastIdleCheckTime >= 1000) {
463             lastIdleCheckTime = currentTime;
464             AbstractIoSession.notifyIdleness(getListeners().getManagedSessions().values().iterator(), currentTime);
465         }
466     }
467 
468     /**
469      * Starts the inner Acceptor thread.
470      */
471     private void startupAcceptor() throws InterruptedException {
472         if (!selectable) {
473             registerQueue.clear();
474             cancelQueue.clear();
475             flushingSessions.clear();
476         }
477 
478         lock.acquire();
479 
480         if (acceptor == null) {
481             acceptor = new Acceptor();
482             executeWorker(acceptor);
483         } else {
484             lock.release();
485         }
486     }
487 
488     protected void init() throws Exception {
489         this.selector = Selector.open();
490     }
491 
492     /**
493      * {@inheritDoc}
494      */
495     public void add(NioSession session) {
496         // Nothing to do for UDP
497     }
498 
499     /**
500      * {@inheritDoc}
501      */
502     @Override
503     protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
504         // Create a bind request as a Future operation. When the selector
505         // have handled the registration, it will signal this future.
506         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
507 
508         // adds the Registration request to the queue for the Workers
509         // to handle
510         registerQueue.add(request);
511 
512         // creates the Acceptor instance and has the local
513         // executor kick it off.
514         startupAcceptor();
515 
516         // As we just started the acceptor, we have to unblock the select()
517         // in order to process the bind request we just have added to the
518         // registerQueue.
519         try {
520             lock.acquire();
521 
522             // Wait a bit to give a chance to the Acceptor thread to do the select()
523             Thread.sleep(10);
524             wakeup();
525         } finally {
526             lock.release();
527         }
528 
529         // Now, we wait until this request is completed.
530         request.awaitUninterruptibly();
531 
532         if (request.getException() != null) {
533             throw request.getException();
534         }
535 
536         // Update the local addresses.
537         // setLocalAddresses() shouldn't be called from the worker thread
538         // because of deadlock.
539         Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
540 
541         for (DatagramChannel handle : boundHandles.values()) {
542             newLocalAddresses.add(localAddress(handle));
543         }
544 
545         return newLocalAddresses;
546     }
547 
548     protected void close(DatagramChannel handle) throws Exception {
549         SelectionKey key = handle.keyFor(selector);
550 
551         if (key != null) {
552             key.cancel();
553         }
554 
555         handle.disconnect();
556         handle.close();
557     }
558 
559     protected void destroy() throws Exception {
560         if (selector != null) {
561             selector.close();
562         }
563     }
564 
565     /**
566      * {@inheritDoc}
567      */
568     @Override
569     protected void dispose0() throws Exception {
570         unbind();
571         startupAcceptor();
572         wakeup();
573     }
574 
575     /**
576      * {@inheritDoc}
577      */
578     public void flush(NioSession session) {
579         if (scheduleFlush(session)) {
580             wakeup();
581         }
582     }
583 
584     @Override
585     public InetSocketAddress getDefaultLocalAddress() {
586         return (InetSocketAddress) super.getDefaultLocalAddress();
587     }
588 
589     @Override
590     public InetSocketAddress getLocalAddress() {
591         return (InetSocketAddress) super.getLocalAddress();
592     }
593 
594     /**
595      * {@inheritDoc}
596      */
597     public DatagramSessionConfig getSessionConfig() {
598         return (DatagramSessionConfig) sessionConfig;
599     }
600 
601     public final IoSessionRecycler getSessionRecycler() {
602         return sessionRecycler;
603     }
604 
605     public TransportMetadata getTransportMetadata() {
606         return NioDatagramSession.METADATA;
607     }
608 
609     protected boolean isReadable(DatagramChannel handle) {
610         SelectionKey key = handle.keyFor(selector);
611 
612         if ((key == null) || (!key.isValid())) {
613             return false;
614         }
615 
616         return key.isReadable();
617     }
618 
619     protected boolean isWritable(DatagramChannel handle) {
620         SelectionKey key = handle.keyFor(selector);
621 
622         if ((key == null) || (!key.isValid())) {
623             return false;
624         }
625 
626         return key.isWritable();
627     }
628 
629     protected SocketAddress localAddress(DatagramChannel handle) throws Exception {
630         InetSocketAddress inetSocketAddress = (InetSocketAddress) handle.socket().getLocalSocketAddress();
631         InetAddress inetAddress = inetSocketAddress.getAddress();
632 
633         if ((inetAddress instanceof Inet6Address) && (((Inet6Address) inetAddress).isIPv4CompatibleAddress())) {
634             // Ugly hack to workaround a problem on linux : the ANY address is always converted to IPV6
635             // even if the original address was an IPV4 address. We do store the two IPV4 and IPV6
636             // ANY address in the map.
637             byte[] ipV6Address = ((Inet6Address) inetAddress).getAddress();
638             byte[] ipV4Address = new byte[4];
639 
640             for (int i = 0; i < 4; i++) {
641                 ipV4Address[i] = ipV6Address[12 + i];
642             }
643 
644             InetAddress inet4Adress = Inet4Address.getByAddress(ipV4Address);
645             return new InetSocketAddress(inet4Adress, inetSocketAddress.getPort());
646         } else {
647             return inetSocketAddress;
648         }
649     }
650 
651     protected NioSession newSession(IoProcessor<NioSession> processor, DatagramChannel handle,
652             SocketAddress remoteAddress) {
653         SelectionKey key = handle.keyFor(selector);
654 
655         if ((key == null) || (!key.isValid())) {
656             return null;
657         }
658 
659         NioDatagramSession newSession = new NioDatagramSession(this, handle, processor, remoteAddress);
660         newSession.setSelectionKey(key);
661 
662         return newSession;
663     }
664 
665     /**
666      * {@inheritDoc}
667      */
668     public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
669         if (isDisposing()) {
670             throw new IllegalStateException("Already disposed.");
671         }
672 
673         if (remoteAddress == null) {
674             throw new IllegalArgumentException("remoteAddress");
675         }
676 
677         synchronized (bindLock) {
678             if (!isActive()) {
679                 throw new IllegalStateException("Can't create a session from a unbound service.");
680             }
681 
682             try {
683                 return newSessionWithoutLock(remoteAddress, localAddress);
684             } catch (RuntimeException e) {
685                 throw e;
686             } catch (Error e) {
687                 throw e;
688             } catch (Exception e) {
689                 throw new RuntimeIoException("Failed to create a session.", e);
690             }
691         }
692     }
693 
694     protected DatagramChannel open(SocketAddress localAddress) throws Exception {
695         final DatagramChannel ch = DatagramChannel.open();
696         boolean success = false;
697         try {
698             new NioDatagramSessionConfig(ch).setAll(getSessionConfig());
699             ch.configureBlocking(false);
700 
701             try {
702                 ch.socket().bind(localAddress);
703             } catch (IOException ioe) {
704                 // Add some info regarding the address we try to bind to the
705                 // message
706                 String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
707                         + ioe.getMessage();
708                 Exception e = new IOException(newMessage);
709                 e.initCause(ioe.getCause());
710 
711                 // And close the channel
712                 ch.close();
713 
714                 throw e;
715             }
716 
717             ch.register(selector, SelectionKey.OP_READ);
718             success = true;
719         } finally {
720             if (!success) {
721                 close(ch);
722             }
723         }
724 
725         return ch;
726     }
727 
728     protected SocketAddress receive(DatagramChannel handle, IoBuffer buffer) throws Exception {
729         return handle.receive(buffer.buf());
730     }
731 
732     /**
733      * {@inheritDoc}
734      */
735     public void remove(NioSession session) {
736         getSessionRecycler().remove(session);
737         getListeners().fireSessionDestroyed(session);
738     }
739 
740     protected int select() throws Exception {
741         return selector.select();
742     }
743 
744     protected int select(long timeout) throws Exception {
745         return selector.select(timeout);
746     }
747 
748     protected Set<SelectionKey> selectedHandles() {
749         return selector.selectedKeys();
750     }
751 
752     protected int send(NioSession session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception {
753         return ((DatagramChannel) session.getChannel()).send(buffer.buf(), remoteAddress);
754     }
755 
756     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
757         setDefaultLocalAddress((SocketAddress) localAddress);
758     }
759 
760     protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
761         SelectionKey key = session.getSelectionKey();
762 
763         if (key == null) {
764             return;
765         }
766 
767         int newInterestOps = key.interestOps();
768 
769         if (isInterested) {
770             newInterestOps |= SelectionKey.OP_WRITE;
771         } else {
772             newInterestOps &= ~SelectionKey.OP_WRITE;
773         }
774 
775         key.interestOps(newInterestOps);
776     }
777 
778     public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
779         synchronized (bindLock) {
780             if (isActive()) {
781                 throw new IllegalStateException("sessionRecycler can't be set while the acceptor is bound.");
782             }
783 
784             if (sessionRecycler == null) {
785                 sessionRecycler = DEFAULT_RECYCLER;
786             }
787 
788             this.sessionRecycler = sessionRecycler;
789         }
790     }
791 
792     /**
793      * {@inheritDoc}
794      */
795     @Override
796     protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
797         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
798 
799         cancelQueue.add(request);
800         startupAcceptor();
801         wakeup();
802 
803         request.awaitUninterruptibly();
804 
805         if (request.getException() != null) {
806             throw request.getException();
807         }
808     }
809 
810     /**
811      * {@inheritDoc}
812      */
813     public void updateTrafficControl(NioSession session) {
814         throw new UnsupportedOperationException();
815     }
816 
817     protected void wakeup() {
818         selector.wakeup();
819     }
820 
821     /**
822      * {@inheritDoc}
823      */
824     public void write(NioSession session, WriteRequest writeRequest) {
825         // We will try to write the message directly
826         long currentTime = System.currentTimeMillis();
827         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
828         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
829                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
830 
831         int writtenBytes = 0;
832 
833         // Deal with the special case of a Message marker (no bytes in the request)
834         // We just have to return after having calle dthe messageSent event
835         IoBuffer buf = (IoBuffer) writeRequest.getMessage();
836 
837         if (buf.remaining() == 0) {
838             // Clear and fire event
839             session.setCurrentWriteRequest(null);
840             buf.reset();
841             session.getFilterChain().fireMessageSent(writeRequest);
842             return;
843         }
844 
845         // Now, write the data
846         try {
847             for (;;) {
848                 if (writeRequest == null) {
849                     writeRequest = writeRequestQueue.poll(session);
850 
851                     if (writeRequest == null) {
852                         setInterestedInWrite(session, false);
853                         break;
854                     }
855 
856                     session.setCurrentWriteRequest(writeRequest);
857                 }
858 
859                 buf = (IoBuffer) writeRequest.getMessage();
860 
861                 if (buf.remaining() == 0) {
862                     // Clear and fire event
863                     session.setCurrentWriteRequest(null);
864                     buf.reset();
865                     session.getFilterChain().fireMessageSent(writeRequest);
866                     continue;
867                 }
868 
869                 SocketAddress destination = writeRequest.getDestination();
870 
871                 if (destination == null) {
872                     destination = session.getRemoteAddress();
873                 }
874 
875                 int localWrittenBytes = send(session, buf, destination);
876 
877                 if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
878                     // Kernel buffer is full or wrote too much
879                     setInterestedInWrite(session, true);
880 
881                     session.getWriteRequestQueue().offer(session, writeRequest);
882                     scheduleFlush(session);
883                 } else {
884                     setInterestedInWrite(session, false);
885 
886                     // Clear and fire event
887                     session.setCurrentWriteRequest(null);
888                     writtenBytes += localWrittenBytes;
889                     buf.reset();
890                     session.getFilterChain().fireMessageSent(writeRequest);
891 
892                     break;
893                 }
894             }
895         } catch (Exception e) {
896             session.getFilterChain().fireExceptionCaught(e);
897         } finally {
898             session.increaseWrittenBytes(writtenBytes, currentTime);
899         }
900     }
901 }