1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport;
21
22 import java.net.SocketAddress;
23 import java.nio.charset.StandardCharsets;
24
25 import org.apache.mina.core.buffer.IoBuffer;
26 import org.apache.mina.core.future.ConnectFuture;
27 import org.apache.mina.core.service.IoAcceptor;
28 import org.apache.mina.core.service.IoHandler;
29 import org.apache.mina.core.service.IoHandlerAdapter;
30 import org.apache.mina.core.service.TransportMetadata;
31 import org.apache.mina.core.session.IoSession;
32 import org.junit.After;
33 import org.junit.Before;
34 import org.junit.Test;
35
36 import static org.junit.Assert.assertEquals;
37 import static org.junit.Assert.assertFalse;
38 import static org.junit.Assert.assertTrue;
39
40
41
42
43
44
45
46 public abstract class AbstractTrafficControlTest {
47
48 protected int port;
49
50 protected IoAcceptor acceptor;
51
52 protected TransportMetadata transportType;
53
54 public AbstractTrafficControlTest(IoAcceptor acceptor) {
55 this.acceptor = acceptor;
56 }
57
58 @Before
59 public void setUp() throws Exception {
60 acceptor.setHandler(new ServerIoHandler());
61 acceptor.bind(createServerSocketAddress(0));
62 port = getPort(acceptor.getLocalAddress());
63 }
64
65 @After
66 public void tearDown() throws Exception {
67 acceptor.unbind();
68 acceptor.dispose();
69 }
70
71 protected abstract ConnectFuture connect(int port, IoHandler handler) throws Exception;
72
73 protected abstract SocketAddress createServerSocketAddress(int port);
74
75 protected abstract int getPort(SocketAddress address);
76
77 @Test
78 public void testSuspendResumeReadWrite() throws Exception {
79 ConnectFuture future = connect(port, new ClientIoHandler());
80 future.awaitUninterruptibly();
81 IoSession session = future.getSession();
82
83
84
85 while (session.getAttribute("lock") == null) {
86 Thread.yield();
87 }
88
89 Object lock = session.getAttribute("lock");
90 synchronized (lock) {
91
92 write(session, "1");
93 assertEquals('1', read(session));
94 assertEquals("1", getReceived(session));
95 assertEquals("1", getSent(session));
96
97 session.suspendRead();
98
99 Thread.sleep(100);
100
101 write(session, "2");
102 assertFalse(canRead(session));
103 assertEquals("1", getReceived(session));
104 assertEquals("12", getSent(session));
105
106 session.suspendWrite();
107
108 Thread.sleep(100);
109
110 write(session, "3");
111 assertFalse(canRead(session));
112 assertEquals("1", getReceived(session));
113 assertEquals("12", getSent(session));
114
115 session.resumeRead();
116
117 Thread.sleep(100);
118
119 write(session, "4");
120 assertEquals('2', read(session));
121 assertEquals("12", getReceived(session));
122 assertEquals("12", getSent(session));
123
124 session.resumeWrite();
125
126 Thread.sleep(100);
127
128 assertEquals('3', read(session));
129 assertEquals('4', read(session));
130
131 write(session, "5");
132 assertEquals('5', read(session));
133 assertEquals("12345", getReceived(session));
134 assertEquals("12345", getSent(session));
135
136 session.suspendWrite();
137
138 Thread.sleep(100);
139
140 write(session, "6");
141 assertFalse(canRead(session));
142 assertEquals("12345", getReceived(session));
143 assertEquals("12345", getSent(session));
144
145 session.suspendRead();
146 session.resumeWrite();
147
148 Thread.sleep(100);
149
150 write(session, "7");
151 assertFalse(canRead(session));
152 assertEquals("12345", getReceived(session));
153 assertEquals("1234567", getSent(session));
154
155 session.resumeRead();
156
157 Thread.sleep(100);
158
159 assertEquals('6', read(session));
160 assertEquals('7', read(session));
161
162 assertEquals("1234567", getReceived(session));
163 assertEquals("1234567", getSent(session));
164
165 }
166
167 session.closeNow().awaitUninterruptibly();
168 }
169
170 private void write(IoSession session, String s) throws Exception {
171 session.write(IoBuffer.wrap(s.getBytes(StandardCharsets.US_ASCII)));
172 }
173
174 private int read(IoSession session) throws Exception {
175 int pos = ((Integer) session.getAttribute("pos")).intValue();
176 for (int i = 0; i < 10 && pos == getReceived(session).length(); i++) {
177 Object lock = session.getAttribute("lock");
178 lock.wait(200);
179 }
180 session.setAttribute("pos", new Integer(pos + 1));
181 String received = getReceived(session);
182 assertTrue(received.length() > pos);
183 return getReceived(session).charAt(pos);
184 }
185
186 private boolean canRead(IoSession session) throws Exception {
187 int pos = ((Integer) session.getAttribute("pos")).intValue();
188 Object lock = session.getAttribute("lock");
189 lock.wait(250);
190 String received = getReceived(session);
191 return pos < received.length();
192 }
193
194 private String getReceived(IoSession session) throws Exception {
195 return session.getAttribute("received").toString();
196 }
197
198 private String getSent(IoSession session) throws Exception {
199 return session.getAttribute("sent").toString();
200 }
201
202 private static class ClientIoHandler extends IoHandlerAdapter {
203
204
205
206 public ClientIoHandler() {
207 super();
208 }
209
210 @Override
211 public void sessionCreated(IoSession session) throws Exception {
212 super.sessionCreated(session);
213 session.setAttribute("pos", new Integer(0));
214 session.setAttribute("received", new StringBuffer());
215 session.setAttribute("sent", new StringBuffer());
216 session.setAttribute("lock", new Object());
217 }
218
219 @Override
220 public void messageReceived(IoSession session, Object message) throws Exception {
221 IoBuffer buffer = (IoBuffer) message;
222 byte[] data = new byte[buffer.remaining()];
223 buffer.get(data);
224 Object lock = session.getAttribute("lock");
225 synchronized (lock) {
226 StringBuffer sb = (StringBuffer) session.getAttribute("received");
227 sb.append(new String(data, "ASCII"));
228 lock.notifyAll();
229 }
230 }
231
232 @Override
233 public void messageSent(IoSession session, Object message) throws Exception {
234 IoBuffer buffer = (IoBuffer) message;
235 buffer.rewind();
236 byte[] data = new byte[buffer.remaining()];
237 buffer.get(data);
238 StringBuffer sb = (StringBuffer) session.getAttribute("sent");
239 sb.append(new String(data, "ASCII"));
240 }
241
242 }
243
244 private static class ServerIoHandler extends IoHandlerAdapter {
245
246
247
248 public ServerIoHandler() {
249 super();
250 }
251
252 @Override
253 public void messageReceived(IoSession session, Object message) throws Exception {
254
255 IoBuffer rb = (IoBuffer) message;
256 IoBuffer wb = IoBuffer.allocate(rb.remaining());
257 wb.put(rb);
258 wb.flip();
259 session.write(wb);
260 }
261 }
262 }