View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport;
21  
22  import java.net.SocketAddress;
23  import java.nio.charset.StandardCharsets;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.future.ConnectFuture;
27  import org.apache.mina.core.service.IoAcceptor;
28  import org.apache.mina.core.service.IoHandler;
29  import org.apache.mina.core.service.IoHandlerAdapter;
30  import org.apache.mina.core.service.TransportMetadata;
31  import org.apache.mina.core.session.IoSession;
32  import org.junit.After;
33  import org.junit.Before;
34  import org.junit.Test;
35  
36  import static org.junit.Assert.assertEquals;
37  import static org.junit.Assert.assertFalse;
38  import static org.junit.Assert.assertTrue;
39  
40  /**
41   * Abstract base class for testing suspending and resuming reads and
42   * writes.
43   *
44   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
45   */
46  public abstract class AbstractTrafficControlTest {
47  
48      protected int port;
49  
50      protected IoAcceptor acceptor;
51  
52      protected TransportMetadata transportType;
53  
54      public AbstractTrafficControlTest(IoAcceptor acceptor) {
55          this.acceptor = acceptor;
56      }
57  
58      @Before
59      public void setUp() throws Exception {
60          acceptor.setHandler(new ServerIoHandler());
61          acceptor.bind(createServerSocketAddress(0));
62          port = getPort(acceptor.getLocalAddress());
63      }
64  
65      @After
66      public void tearDown() throws Exception {
67          acceptor.unbind();
68          acceptor.dispose();
69      }
70  
71      protected abstract ConnectFuture connect(int port, IoHandler handler) throws Exception;
72  
73      protected abstract SocketAddress createServerSocketAddress(int port);
74  
75      protected abstract int getPort(SocketAddress address);
76  
77      @Test
78      public void testSuspendResumeReadWrite() throws Exception {
79          ConnectFuture future = connect(port, new ClientIoHandler());
80          future.awaitUninterruptibly();
81          IoSession session = future.getSession();
82  
83          // We wait for the sessionCreated() event is fired because we
84          // cannot guarantee that it is invoked already.
85          while (session.getAttribute("lock") == null) {
86              Thread.yield();
87          }
88  
89          Object lock = session.getAttribute("lock");
90          synchronized (lock) {
91  
92              write(session, "1");
93              assertEquals('1', read(session));
94              assertEquals("1", getReceived(session));
95              assertEquals("1", getSent(session));
96  
97              session.suspendRead();
98  
99              Thread.sleep(100);
100 
101             write(session, "2");
102             assertFalse(canRead(session));
103             assertEquals("1", getReceived(session));
104             assertEquals("12", getSent(session));
105 
106             session.suspendWrite();
107 
108             Thread.sleep(100);
109 
110             write(session, "3");
111             assertFalse(canRead(session));
112             assertEquals("1", getReceived(session));
113             assertEquals("12", getSent(session));
114 
115             session.resumeRead();
116 
117             Thread.sleep(100);
118 
119             write(session, "4");
120             assertEquals('2', read(session));
121             assertEquals("12", getReceived(session));
122             assertEquals("12", getSent(session));
123 
124             session.resumeWrite();
125 
126             Thread.sleep(100);
127 
128             assertEquals('3', read(session));
129             assertEquals('4', read(session));
130 
131             write(session, "5");
132             assertEquals('5', read(session));
133             assertEquals("12345", getReceived(session));
134             assertEquals("12345", getSent(session));
135 
136             session.suspendWrite();
137 
138             Thread.sleep(100);
139 
140             write(session, "6");
141             assertFalse(canRead(session));
142             assertEquals("12345", getReceived(session));
143             assertEquals("12345", getSent(session));
144 
145             session.suspendRead();
146             session.resumeWrite();
147 
148             Thread.sleep(100);
149 
150             write(session, "7");
151             assertFalse(canRead(session));
152             assertEquals("12345", getReceived(session));
153             assertEquals("1234567", getSent(session));
154 
155             session.resumeRead();
156 
157             Thread.sleep(100);
158 
159             assertEquals('6', read(session));
160             assertEquals('7', read(session));
161 
162             assertEquals("1234567", getReceived(session));
163             assertEquals("1234567", getSent(session));
164 
165         }
166 
167         session.closeNow().awaitUninterruptibly();
168     }
169 
170     private void write(IoSession session, String s) throws Exception {
171         session.write(IoBuffer.wrap(s.getBytes(StandardCharsets.US_ASCII)));
172     }
173 
174     private int read(IoSession session) throws Exception {
175         int pos = ((Integer) session.getAttribute("pos")).intValue();
176         for (int i = 0; i < 10 && pos == getReceived(session).length(); i++) {
177             Object lock = session.getAttribute("lock");
178             lock.wait(200);
179         }
180         session.setAttribute("pos", new Integer(pos + 1));
181         String received = getReceived(session);
182         assertTrue(received.length() > pos);
183         return getReceived(session).charAt(pos);
184     }
185 
186     private boolean canRead(IoSession session) throws Exception {
187         int pos = ((Integer) session.getAttribute("pos")).intValue();
188         Object lock = session.getAttribute("lock");
189         lock.wait(250);
190         String received = getReceived(session);
191         return pos < received.length();
192     }
193 
194     private String getReceived(IoSession session) throws Exception {
195         return session.getAttribute("received").toString();
196     }
197 
198     private String getSent(IoSession session) throws Exception {
199         return session.getAttribute("sent").toString();
200     }
201 
202     private static class ClientIoHandler extends IoHandlerAdapter {
203         /**
204          * Default constructor
205          */
206         public ClientIoHandler() {
207             super();
208         }
209 
210         @Override
211         public void sessionCreated(IoSession session) throws Exception {
212             super.sessionCreated(session);
213             session.setAttribute("pos", new Integer(0));
214             session.setAttribute("received", new StringBuffer());
215             session.setAttribute("sent", new StringBuffer());
216             session.setAttribute("lock", new Object());
217         }
218 
219         @Override
220         public void messageReceived(IoSession session, Object message) throws Exception {
221             IoBuffer buffer = (IoBuffer) message;
222             byte[] data = new byte[buffer.remaining()];
223             buffer.get(data);
224             Object lock = session.getAttribute("lock");
225             synchronized (lock) {
226                 StringBuffer sb = (StringBuffer) session.getAttribute("received");
227                 sb.append(new String(data, "ASCII"));
228                 lock.notifyAll();
229             }
230         }
231 
232         @Override
233         public void messageSent(IoSession session, Object message) throws Exception {
234             IoBuffer buffer = (IoBuffer) message;
235             buffer.rewind();
236             byte[] data = new byte[buffer.remaining()];
237             buffer.get(data);
238             StringBuffer sb = (StringBuffer) session.getAttribute("sent");
239             sb.append(new String(data, "ASCII"));
240         }
241 
242     }
243 
244     private static class ServerIoHandler extends IoHandlerAdapter {
245         /**
246          * Default constructor
247          */
248         public ServerIoHandler() {
249             super();
250         }
251 
252         @Override
253         public void messageReceived(IoSession session, Object message) throws Exception {
254             // Just echo the received bytes.
255             IoBuffer rb = (IoBuffer) message;
256             IoBuffer wb = IoBuffer.allocate(rb.remaining());
257             wb.put(rb);
258             wb.flip();
259             session.write(wb);
260         }
261     }
262 }