View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.nio;
21  
22  import java.io.IOException;
23  import java.net.Inet4Address;
24  import java.net.Inet6Address;
25  import java.net.InetAddress;
26  import java.net.InetSocketAddress;
27  import java.net.SocketAddress;
28  import java.nio.channels.ClosedSelectorException;
29  import java.nio.channels.DatagramChannel;
30  import java.nio.channels.SelectionKey;
31  import java.nio.channels.Selector;
32  import java.util.Collections;
33  import java.util.HashMap;
34  import java.util.HashSet;
35  import java.util.Iterator;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.Queue;
39  import java.util.Set;
40  import java.util.concurrent.ConcurrentLinkedQueue;
41  import java.util.concurrent.Executor;
42  import java.util.concurrent.Semaphore;
43  
44  import org.apache.mina.core.RuntimeIoException;
45  import org.apache.mina.core.buffer.IoBuffer;
46  import org.apache.mina.core.service.AbstractIoAcceptor;
47  import org.apache.mina.core.service.IoAcceptor;
48  import org.apache.mina.core.service.IoProcessor;
49  import org.apache.mina.core.service.TransportMetadata;
50  import org.apache.mina.core.session.AbstractIoSession;
51  import org.apache.mina.core.session.ExpiringSessionRecycler;
52  import org.apache.mina.core.session.IoSession;
53  import org.apache.mina.core.session.IoSessionConfig;
54  import org.apache.mina.core.session.IoSessionRecycler;
55  import org.apache.mina.core.write.WriteRequest;
56  import org.apache.mina.core.write.WriteRequestQueue;
57  import org.apache.mina.transport.socket.DatagramAcceptor;
58  import org.apache.mina.transport.socket.DatagramSessionConfig;
59  import org.apache.mina.transport.socket.DefaultDatagramSessionConfig;
60  import org.apache.mina.util.ExceptionMonitor;
61  
62  /**
63   * {@link IoAcceptor} for datagram transport (UDP/IP).
64   *
65   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
66   * @org.apache.xbean.XBean
67   */
68  public final class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor, IoProcessor<NioSession> {
69      /**
70       * A session recycler that is used to retrieve an existing session, unless it's too old.
71       **/
72      private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
73  
74      /**
75       * A timeout used for the select, as we need to get out to deal with idle
76       * sessions
77       */
78      private static final long SELECT_TIMEOUT = 1000L;
79  
80      /** A lock used to protect the selector to be waked up before it's created */
81      private final Semaphore lock = new Semaphore(1);
82  
83      /** A queue used to store the list of pending Binds */
84      private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
85  
86      private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
87  
88      private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<NioSession>();
89  
90      private final Map<SocketAddress, DatagramChannel> boundHandles = Collections
91              .synchronizedMap(new HashMap<SocketAddress, DatagramChannel>());
92  
93      private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
94  
95      private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
96  
97      private volatile boolean selectable;
98  
99      /** The thread responsible of accepting incoming requests */
100     private Acceptor acceptor;
101 
102     private long lastIdleCheckTime;
103 
104     /** The Selector used by this acceptor */
105     private volatile Selector selector;
106 
107     /**
108      * Creates a new instance.
109      */
110     public NioDatagramAcceptor() {
111         this(new DefaultDatagramSessionConfig(), null);
112     }
113 
114     /**
115      * Creates a new instance.
116      * 
117      * @param executor The executor to use
118      */
119     public NioDatagramAcceptor(Executor executor) {
120         this(new DefaultDatagramSessionConfig(), executor);
121     }
122 
123     /**
124      * Creates a new instance.
125      */
126     private NioDatagramAcceptor(IoSessionConfig sessionConfig, Executor executor) {
127         super(sessionConfig, executor);
128 
129         try {
130             init();
131             selectable = true;
132         } catch (RuntimeException e) {
133             throw e;
134         } catch (Exception e) {
135             throw new RuntimeIoException("Failed to initialize.", e);
136         } finally {
137             if (!selectable) {
138                 try {
139                     destroy();
140                 } catch (Exception e) {
141                     ExceptionMonitor.getInstance().exceptionCaught(e);
142                 }
143             }
144         }
145     }
146 
147     /**
148      * This private class is used to accept incoming connection from
149      * clients. It's an infinite loop, which can be stopped when all
150      * the registered handles have been removed (unbound).
151      */
152     private class Acceptor implements Runnable {
153         public void run() {
154             int nHandles = 0;
155             lastIdleCheckTime = System.currentTimeMillis();
156 
157             // Release the lock
158             lock.release();
159 
160             while (selectable) {
161                 try {
162                     int selected = select(SELECT_TIMEOUT);
163 
164                     nHandles += registerHandles();
165 
166                     if (nHandles == 0) {
167                         try {
168                             lock.acquire();
169 
170                             if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
171                                 acceptor = null;
172                                 break;
173                             }
174                         } finally {
175                             lock.release();
176                         }
177                     }
178 
179                     if (selected > 0) {
180                         processReadySessions(selectedHandles());
181                     }
182 
183                     long currentTime = System.currentTimeMillis();
184                     flushSessions(currentTime);
185                     nHandles -= unregisterHandles();
186 
187                     notifyIdleSessions(currentTime);
188                 } catch (ClosedSelectorException cse) {
189                     // If the selector has been closed, we can exit the loop
190                     ExceptionMonitor.getInstance().exceptionCaught(cse);
191                     break;
192                 } catch (Exception e) {
193                     ExceptionMonitor.getInstance().exceptionCaught(e);
194 
195                     try {
196                         Thread.sleep(1000);
197                     } catch (InterruptedException e1) {
198                     }
199                 }
200             }
201 
202             if (selectable && isDisposing()) {
203                 selectable = false;
204                 try {
205                     destroy();
206                 } catch (Exception e) {
207                     ExceptionMonitor.getInstance().exceptionCaught(e);
208                 } finally {
209                     disposalFuture.setValue(true);
210                 }
211             }
212         }
213     }
214 
215     private int registerHandles() {
216         for (;;) {
217             AcceptorOperationFuture req = registerQueue.poll();
218 
219             if (req == null) {
220                 break;
221             }
222 
223             Map<SocketAddress, DatagramChannel> newHandles = new HashMap<SocketAddress, DatagramChannel>();
224             List<SocketAddress> localAddresses = req.getLocalAddresses();
225 
226             try {
227                 for (SocketAddress socketAddress : localAddresses) {
228                     DatagramChannel handle = open(socketAddress);
229                     newHandles.put(localAddress(handle), handle);
230                 }
231 
232                 boundHandles.putAll(newHandles);
233 
234                 getListeners().fireServiceActivated();
235                 req.setDone();
236 
237                 return newHandles.size();
238             } catch (Exception e) {
239                 req.setException(e);
240             } finally {
241                 // Roll back if failed to bind all addresses.
242                 if (req.getException() != null) {
243                     for (DatagramChannel handle : newHandles.values()) {
244                         try {
245                             close(handle);
246                         } catch (Exception e) {
247                             ExceptionMonitor.getInstance().exceptionCaught(e);
248                         }
249                     }
250 
251                     wakeup();
252                 }
253             }
254         }
255 
256         return 0;
257     }
258 
259     private void processReadySessions(Set<SelectionKey> handles) {
260         Iterator<SelectionKey> iterator = handles.iterator();
261 
262         while (iterator.hasNext()) {
263             SelectionKey key = iterator.next();
264             DatagramChannel handle = (DatagramChannel) key.channel();
265             iterator.remove();
266 
267             try {
268                 if (key.isValid() && key.isReadable()) {
269                     readHandle(handle);
270                 }
271 
272                 if (key.isValid() && key.isWritable()) {
273                     for (IoSession session : getManagedSessions().values()) {
274                         scheduleFlush((NioSession) session);
275                     }
276                 }
277             } catch (Exception e) {
278                 ExceptionMonitor.getInstance().exceptionCaught(e);
279             }
280         }
281     }
282 
283     private boolean scheduleFlush(NioSession session) {
284         // Set the schedule for flush flag if the session
285         // has not already be added to the flushingSessions
286         // queue
287         if (session.setScheduledForFlush(true)) {
288             flushingSessions.add(session);
289             return true;
290         } else {
291             return false;
292         }
293     }
294 
295     private void readHandle(DatagramChannel handle) throws Exception {
296         IoBuffer readBuf = IoBuffer.allocate(getSessionConfig().getReadBufferSize());
297 
298         SocketAddress remoteAddress = receive(handle, readBuf);
299 
300         if (remoteAddress != null) {
301             IoSession session = newSessionWithoutLock(remoteAddress, localAddress(handle));
302 
303             readBuf.flip();
304 
305             session.getFilterChain().fireMessageReceived(readBuf);
306         }
307     }
308 
309     private IoSession newSessionWithoutLock(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
310         DatagramChannel handle = boundHandles.get(localAddress);
311 
312         if (handle == null) {
313             throw new IllegalArgumentException("Unknown local address: " + localAddress);
314         }
315 
316         IoSession session;
317 
318         synchronized (sessionRecycler) {
319             session = sessionRecycler.recycle(remoteAddress);
320 
321             if (session != null) {
322                 return session;
323             }
324 
325             // If a new session needs to be created.
326             NioSession newSession = newSession(this, handle, remoteAddress);
327             getSessionRecycler().put(newSession);
328             session = newSession;
329         }
330 
331         initSession(session, null, null);
332 
333         try {
334             this.getFilterChainBuilder().buildFilterChain(session.getFilterChain());
335             getListeners().fireSessionCreated(session);
336         } catch (Exception e) {
337             ExceptionMonitor.getInstance().exceptionCaught(e);
338         }
339 
340         return session;
341     }
342 
343     private void flushSessions(long currentTime) {
344         for (;;) {
345             NioSession session = flushingSessions.poll();
346 
347             if (session == null) {
348                 break;
349             }
350 
351             // Reset the Schedule for flush flag for this session,
352             // as we are flushing it now
353             session.unscheduledForFlush();
354 
355             try {
356                 boolean flushedAll = flush(session, currentTime);
357 
358                 if (flushedAll && !session.getWriteRequestQueue().isEmpty(session) && !session.isScheduledForFlush()) {
359                     scheduleFlush(session);
360                 }
361             } catch (Exception e) {
362                 session.getFilterChain().fireExceptionCaught(e);
363             }
364         }
365     }
366 
367     private boolean flush(NioSession session, long currentTime) throws Exception {
368         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
369         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
370                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
371 
372         int writtenBytes = 0;
373 
374         try {
375             for (;;) {
376                 WriteRequest req = session.getCurrentWriteRequest();
377 
378                 if (req == null) {
379                     req = writeRequestQueue.poll(session);
380 
381                     if (req == null) {
382                         setInterestedInWrite(session, false);
383                         break;
384                     }
385 
386                     session.setCurrentWriteRequest(req);
387                 }
388 
389                 IoBuffer buf = (IoBuffer) req.getMessage();
390 
391                 if (buf.remaining() == 0) {
392                     // Clear and fire event
393                     session.setCurrentWriteRequest(null);
394                     buf.reset();
395                     session.getFilterChain().fireMessageSent(req);
396                     continue;
397                 }
398 
399                 SocketAddress destination = req.getDestination();
400 
401                 if (destination == null) {
402                     destination = session.getRemoteAddress();
403                 }
404 
405                 int localWrittenBytes = send(session, buf, destination);
406 
407                 if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
408                     // Kernel buffer is full or wrote too much
409                     setInterestedInWrite(session, true);
410 
411                     return false;
412                 } else {
413                     setInterestedInWrite(session, false);
414 
415                     // Clear and fire event
416                     session.setCurrentWriteRequest(null);
417                     writtenBytes += localWrittenBytes;
418                     buf.reset();
419                     session.getFilterChain().fireMessageSent(req);
420                 }
421             }
422         } finally {
423             session.increaseWrittenBytes(writtenBytes, currentTime);
424         }
425 
426         return true;
427     }
428 
429     private int unregisterHandles() {
430         int nHandles = 0;
431 
432         for (;;) {
433             AcceptorOperationFuture request = cancelQueue.poll();
434             if (request == null) {
435                 break;
436             }
437 
438             // close the channels
439             for (SocketAddress socketAddress : request.getLocalAddresses()) {
440                 DatagramChannel handle = boundHandles.remove(socketAddress);
441 
442                 if (handle == null) {
443                     continue;
444                 }
445 
446                 try {
447                     close(handle);
448                     wakeup(); // wake up again to trigger thread death
449                 } catch (Exception e) {
450                     ExceptionMonitor.getInstance().exceptionCaught(e);
451                 } finally {
452                     nHandles++;
453                 }
454             }
455 
456             request.setDone();
457         }
458 
459         return nHandles;
460     }
461 
462     private void notifyIdleSessions(long currentTime) {
463         // process idle sessions
464         if (currentTime - lastIdleCheckTime >= 1000) {
465             lastIdleCheckTime = currentTime;
466             AbstractIoSession.notifyIdleness(getListeners().getManagedSessions().values().iterator(), currentTime);
467         }
468     }
469 
470     /**
471      * Starts the inner Acceptor thread.
472      */
473     private void startupAcceptor() throws InterruptedException {
474         if (!selectable) {
475             registerQueue.clear();
476             cancelQueue.clear();
477             flushingSessions.clear();
478         }
479 
480         lock.acquire();
481 
482         if (acceptor == null) {
483             acceptor = new Acceptor();
484             executeWorker(acceptor);
485         } else {
486             lock.release();
487         }
488     }
489 
490     protected void init() throws Exception {
491         this.selector = Selector.open();
492     }
493 
494     /**
495      * {@inheritDoc}
496      */
497     public void add(NioSession session) {
498         // Nothing to do for UDP
499     }
500 
501     /**
502      * {@inheritDoc}
503      */
504     @Override
505     protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
506         // Create a bind request as a Future operation. When the selector
507         // have handled the registration, it will signal this future.
508         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
509 
510         // adds the Registration request to the queue for the Workers
511         // to handle
512         registerQueue.add(request);
513 
514         // creates the Acceptor instance and has the local
515         // executor kick it off.
516         startupAcceptor();
517 
518         // As we just started the acceptor, we have to unblock the select()
519         // in order to process the bind request we just have added to the
520         // registerQueue.
521         try {
522             lock.acquire();
523 
524             // Wait a bit to give a chance to the Acceptor thread to do the select()
525             Thread.sleep(10);
526             wakeup();
527         } finally {
528             lock.release();
529         }
530 
531         // Now, we wait until this request is completed.
532         request.awaitUninterruptibly();
533 
534         if (request.getException() != null) {
535             throw request.getException();
536         }
537 
538         // Update the local addresses.
539         // setLocalAddresses() shouldn't be called from the worker thread
540         // because of deadlock.
541         Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
542 
543         for (DatagramChannel handle : boundHandles.values()) {
544             newLocalAddresses.add(localAddress(handle));
545         }
546 
547         return newLocalAddresses;
548     }
549 
550     protected void close(DatagramChannel handle) throws Exception {
551         SelectionKey key = handle.keyFor(selector);
552 
553         if (key != null) {
554             key.cancel();
555         }
556 
557         handle.disconnect();
558         handle.close();
559     }
560 
561     protected void destroy() throws Exception {
562         if (selector != null) {
563             selector.close();
564         }
565     }
566 
567     /**
568      * {@inheritDoc}
569      */
570     @Override
571     protected void dispose0() throws Exception {
572         unbind();
573         startupAcceptor();
574         wakeup();
575     }
576 
577     /**
578      * {@inheritDoc}
579      */
580     public void flush(NioSession session) {
581         if (scheduleFlush(session)) {
582             wakeup();
583         }
584     }
585 
586     @Override
587     public InetSocketAddress getDefaultLocalAddress() {
588         return (InetSocketAddress) super.getDefaultLocalAddress();
589     }
590 
591     @Override
592     public InetSocketAddress getLocalAddress() {
593         return (InetSocketAddress) super.getLocalAddress();
594     }
595 
596     /**
597      * {@inheritDoc}
598      */
599     public DatagramSessionConfig getSessionConfig() {
600         return (DatagramSessionConfig) sessionConfig;
601     }
602 
603     public final IoSessionRecycler getSessionRecycler() {
604         return sessionRecycler;
605     }
606 
607     public TransportMetadata getTransportMetadata() {
608         return NioDatagramSession.METADATA;
609     }
610 
611     protected boolean isReadable(DatagramChannel handle) {
612         SelectionKey key = handle.keyFor(selector);
613 
614         if ((key == null) || (!key.isValid())) {
615             return false;
616         }
617 
618         return key.isReadable();
619     }
620 
621     protected boolean isWritable(DatagramChannel handle) {
622         SelectionKey key = handle.keyFor(selector);
623 
624         if ((key == null) || (!key.isValid())) {
625             return false;
626         }
627 
628         return key.isWritable();
629     }
630 
631     protected SocketAddress localAddress(DatagramChannel handle) throws Exception {
632         InetSocketAddress inetSocketAddress = (InetSocketAddress) handle.socket().getLocalSocketAddress();
633         InetAddress inetAddress = inetSocketAddress.getAddress();
634 
635         if ((inetAddress instanceof Inet6Address) && (((Inet6Address) inetAddress).isIPv4CompatibleAddress())) {
636             // Ugly hack to workaround a problem on linux : the ANY address is always converted to IPV6
637             // even if the original address was an IPV4 address. We do store the two IPV4 and IPV6
638             // ANY address in the map.
639             byte[] ipV6Address = ((Inet6Address) inetAddress).getAddress();
640             byte[] ipV4Address = new byte[4];
641 
642             System.arraycopy(ipV6Address, 12, ipV4Address, 0, 4);
643 
644             InetAddress inet4Adress = Inet4Address.getByAddress(ipV4Address);
645             return new InetSocketAddress(inet4Adress, inetSocketAddress.getPort());
646         } else {
647             return inetSocketAddress;
648         }
649     }
650 
651     protected NioSession newSession(IoProcessor<NioSession> processor, DatagramChannel handle,
652             SocketAddress remoteAddress) {
653         SelectionKey key = handle.keyFor(selector);
654 
655         if ((key == null) || (!key.isValid())) {
656             return null;
657         }
658 
659         NioDatagramSession newSession = new NioDatagramSession(this, handle, processor, remoteAddress);
660         newSession.setSelectionKey(key);
661 
662         return newSession;
663     }
664 
665     /**
666      * {@inheritDoc}
667      */
668     public final IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
669         if (isDisposing()) {
670             throw new IllegalStateException("The Acceptor is being disposed.");
671         }
672 
673         if (remoteAddress == null) {
674             throw new IllegalArgumentException("remoteAddress");
675         }
676 
677         synchronized (bindLock) {
678             if (!isActive()) {
679                 throw new IllegalStateException("Can't create a session from a unbound service.");
680             }
681 
682             try {
683                 return newSessionWithoutLock(remoteAddress, localAddress);
684             } catch (RuntimeException e) {
685                 throw e;
686             } catch (Error e) {
687                 throw e;
688             } catch (Exception e) {
689                 throw new RuntimeIoException("Failed to create a session.", e);
690             }
691         }
692     }
693 
694     protected DatagramChannel open(SocketAddress localAddress) throws Exception {
695         final DatagramChannel ch = DatagramChannel.open();
696         boolean success = false;
697         try {
698             new NioDatagramSessionConfig(ch).setAll(getSessionConfig());
699             ch.configureBlocking(false);
700 
701             try {
702                 ch.socket().bind(localAddress);
703             } catch (IOException ioe) {
704                 // Add some info regarding the address we try to bind to the
705                 // message
706                 String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
707                         + ioe.getMessage();
708                 Exception e = new IOException(newMessage);
709                 e.initCause(ioe.getCause());
710 
711                 // And close the channel
712                 ch.close();
713 
714                 throw e;
715             }
716 
717             ch.register(selector, SelectionKey.OP_READ);
718             success = true;
719         } finally {
720             if (!success) {
721                 close(ch);
722             }
723         }
724 
725         return ch;
726     }
727 
728     protected SocketAddress receive(DatagramChannel handle, IoBuffer buffer) throws Exception {
729         return handle.receive(buffer.buf());
730     }
731 
732     /**
733      * {@inheritDoc}
734      */
735     public void remove(NioSession session) {
736         getSessionRecycler().remove(session);
737         getListeners().fireSessionDestroyed(session);
738     }
739 
740     protected int select() throws Exception {
741         return selector.select();
742     }
743 
744     protected int select(long timeout) throws Exception {
745         return selector.select(timeout);
746     }
747 
748     protected Set<SelectionKey> selectedHandles() {
749         return selector.selectedKeys();
750     }
751 
752     protected int send(NioSession session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception {
753         return ((DatagramChannel) session.getChannel()).send(buffer.buf(), remoteAddress);
754     }
755 
756     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
757         setDefaultLocalAddress((SocketAddress) localAddress);
758     }
759 
760     protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
761         SelectionKey key = session.getSelectionKey();
762 
763         if (key == null) {
764             return;
765         }
766 
767         int newInterestOps = key.interestOps();
768 
769         if (isInterested) {
770             newInterestOps |= SelectionKey.OP_WRITE;
771         } else {
772             newInterestOps &= ~SelectionKey.OP_WRITE;
773         }
774 
775         key.interestOps(newInterestOps);
776     }
777 
778     public final void setSessionRecycler(IoSessionRecycler sessionRecycler) {
779         synchronized (bindLock) {
780             if (isActive()) {
781                 throw new IllegalStateException("sessionRecycler can't be set while the acceptor is bound.");
782             }
783 
784             if (sessionRecycler == null) {
785                 sessionRecycler = DEFAULT_RECYCLER;
786             }
787 
788             this.sessionRecycler = sessionRecycler;
789         }
790     }
791 
792     /**
793      * {@inheritDoc}
794      */
795     @Override
796     protected final void unbind0(List<? extends SocketAddress> localAddresses) throws Exception {
797         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
798 
799         cancelQueue.add(request);
800         startupAcceptor();
801         wakeup();
802 
803         request.awaitUninterruptibly();
804 
805         if (request.getException() != null) {
806             throw request.getException();
807         }
808     }
809 
810     /**
811      * {@inheritDoc}
812      */
813     public void updateTrafficControl(NioSession session) {
814         throw new UnsupportedOperationException();
815     }
816 
817     protected void wakeup() {
818         selector.wakeup();
819     }
820 
821     /**
822      * {@inheritDoc}
823      */
824     public void write(NioSession session, WriteRequest writeRequest) {
825         // We will try to write the message directly
826         long currentTime = System.currentTimeMillis();
827         final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
828         final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
829                 + (session.getConfig().getMaxReadBufferSize() >>> 1);
830 
831         int writtenBytes = 0;
832 
833         // Deal with the special case of a Message marker (no bytes in the request)
834         // We just have to return after having calle dthe messageSent event
835         IoBuffer buf = (IoBuffer) writeRequest.getMessage();
836 
837         if (buf.remaining() == 0) {
838             // Clear and fire event
839             session.setCurrentWriteRequest(null);
840             buf.reset();
841             session.getFilterChain().fireMessageSent(writeRequest);
842             return;
843         }
844 
845         // Now, write the data
846         try {
847             for (;;) {
848                 if (writeRequest == null) {
849                     writeRequest = writeRequestQueue.poll(session);
850 
851                     if (writeRequest == null) {
852                         setInterestedInWrite(session, false);
853                         break;
854                     }
855 
856                     session.setCurrentWriteRequest(writeRequest);
857                 }
858 
859                 buf = (IoBuffer) writeRequest.getMessage();
860 
861                 if (buf.remaining() == 0) {
862                     // Clear and fire event
863                     session.setCurrentWriteRequest(null);
864                     buf.reset();
865                     session.getFilterChain().fireMessageSent(writeRequest);
866                     continue;
867                 }
868 
869                 SocketAddress destination = writeRequest.getDestination();
870 
871                 if (destination == null) {
872                     destination = session.getRemoteAddress();
873                 }
874 
875                 int localWrittenBytes = send(session, buf, destination);
876 
877                 if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
878                     // Kernel buffer is full or wrote too much
879                     setInterestedInWrite(session, true);
880 
881                     session.getWriteRequestQueue().offer(session, writeRequest);
882                     scheduleFlush(session);
883                 } else {
884                     setInterestedInWrite(session, false);
885 
886                     // Clear and fire event
887                     session.setCurrentWriteRequest(null);
888                     writtenBytes += localWrittenBytes;
889                     buf.reset();
890                     session.getFilterChain().fireMessageSent(writeRequest);
891 
892                     break;
893                 }
894             }
895         } catch (Exception e) {
896             session.getFilterChain().fireExceptionCaught(e);
897         } finally {
898             session.increaseWrittenBytes(writtenBytes, currentTime);
899         }
900     }
901 }