1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.apache.mina.transport.socket.apr;
21
22 import java.io.IOException;
23 import java.net.InetSocketAddress;
24 import java.net.SocketAddress;
25 import java.util.Iterator;
26 import java.util.Queue;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.Executor;
29
30 import org.apache.mina.core.RuntimeIoException;
31 import org.apache.mina.core.polling.AbstractPollingIoAcceptor;
32 import org.apache.mina.core.service.IoAcceptor;
33 import org.apache.mina.core.service.IoProcessor;
34 import org.apache.mina.core.service.IoService;
35 import org.apache.mina.core.service.SimpleIoProcessorPool;
36 import org.apache.mina.core.service.TransportMetadata;
37 import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
38 import org.apache.tomcat.jni.Address;
39 import org.apache.tomcat.jni.Poll;
40 import org.apache.tomcat.jni.Pool;
41 import org.apache.tomcat.jni.Socket;
42 import org.apache.tomcat.jni.Status;
43
44
45
46
47
48
49 public final class AprSocketAcceptor extends AbstractPollingIoAcceptor<AprSession, Long> {
50
51
52
53
54 private static final int APR_TIMEUP_ERROR = -120001;
55
56 private static final int POLLSET_SIZE = 1024;
57
58 private final Object wakeupLock = new Object();
59
60 private volatile long wakeupSocket;
61
62 private volatile boolean toBeWakenUp;
63
64 private volatile long pool;
65
66 private volatile long pollset;
67
68 private final long[] polledSockets = new long[POLLSET_SIZE << 1];
69
70 private final Queue<Long> polledHandles = new ConcurrentLinkedQueue<Long>();
71
72
73
74
75 public AprSocketAcceptor() {
76 super(new DefaultSocketSessionConfig(), AprIoProcessor.class);
77 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
78 }
79
80
81
82
83
84
85
86
87 public AprSocketAcceptor(int processorCount) {
88 super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount);
89 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
90 }
91
92
93
94
95
96
97
98 public AprSocketAcceptor(IoProcessor<AprSession> processor) {
99 super(new DefaultSocketSessionConfig(), processor);
100 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
101 }
102
103
104
105
106
107
108
109
110 public AprSocketAcceptor(Executor executor, IoProcessor<AprSession> processor) {
111 super(new DefaultSocketSessionConfig(), executor, processor);
112 ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
113 }
114
115
116
117
118 @Override
119 protected AprSession accept(IoProcessor<AprSession> processor, Long handle) throws Exception {
120 long s = Socket.accept(handle);
121 boolean success = false;
122 try {
123 AprSession result = new AprSocketSession(this, processor, s);
124 success = true;
125 return result;
126 } finally {
127 if (!success) {
128 Socket.close(s);
129 }
130 }
131 }
132
133
134
135
136 @Override
137 protected Long open(SocketAddress localAddress) throws Exception {
138 InetSocketAddress la = (InetSocketAddress) localAddress;
139 long handle = Socket.create(Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
140
141 boolean success = false;
142 try {
143 int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
144 if (result != Status.APR_SUCCESS) {
145 throwException(result);
146 }
147 result = Socket.timeoutSet(handle, 0);
148 if (result != Status.APR_SUCCESS) {
149 throwException(result);
150 }
151
152
153 result = Socket.optSet(handle, Socket.APR_SO_REUSEADDR, isReuseAddress() ? 1 : 0);
154 if (result != Status.APR_SUCCESS) {
155 throwException(result);
156 }
157 result = Socket.optSet(handle, Socket.APR_SO_RCVBUF, getSessionConfig().getReceiveBufferSize());
158 if (result != Status.APR_SUCCESS) {
159 throwException(result);
160 }
161
162
163 long sa;
164 if (la != null) {
165 if (la.getAddress() == null) {
166 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool);
167 } else {
168 sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool);
169 }
170 } else {
171 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool);
172 }
173
174 result = Socket.bind(handle, sa);
175 if (result != Status.APR_SUCCESS) {
176 throwException(result);
177 }
178 result = Socket.listen(handle, getBacklog());
179 if (result != Status.APR_SUCCESS) {
180 throwException(result);
181 }
182
183 result = Poll.add(pollset, handle, Poll.APR_POLLIN);
184 if (result != Status.APR_SUCCESS) {
185 throwException(result);
186 }
187 success = true;
188 } finally {
189 if (!success) {
190 close(handle);
191 }
192 }
193 return handle;
194 }
195
196
197
198
199 @Override
200 protected void init() throws Exception {
201
202 pool = Pool.create(AprLibrary.getInstance().getRootPool());
203
204 wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
205
206 pollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
207
208 if (pollset <= 0) {
209 pollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
210 }
211
212 if (pollset <= 0) {
213 if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
214 throw new RuntimeIoException("Thread-safe pollset is not supported in this platform.");
215 }
216 }
217 }
218
219
220
221
222 @Override
223 protected void destroy() throws Exception {
224 if (wakeupSocket > 0) {
225 Socket.close(wakeupSocket);
226 }
227 if (pollset > 0) {
228 Poll.destroy(pollset);
229 }
230 if (pool > 0) {
231 Pool.destroy(pool);
232 }
233 }
234
235
236
237
238 @Override
239 protected SocketAddress localAddress(Long handle) throws Exception {
240 long la = Address.get(Socket.APR_LOCAL, handle);
241 return new InetSocketAddress(Address.getip(la), Address.getInfo(la).port);
242 }
243
244
245
246
247 @Override
248 protected int select() throws Exception {
249 int rv = Poll.poll(pollset, Integer.MAX_VALUE, polledSockets, false);
250 if (rv <= 0) {
251
252
253 if (rv != APR_TIMEUP_ERROR) {
254
255 throwException(rv);
256 }
257
258 rv = Poll.maintain(pollset, polledSockets, true);
259 if (rv > 0) {
260 for (int i = 0; i < rv; i++) {
261 Poll.add(pollset, polledSockets[i], Poll.APR_POLLIN);
262 }
263 } else if (rv < 0) {
264 throwException(rv);
265 }
266
267 return 0;
268 } else {
269 rv <<= 1;
270 if (!polledHandles.isEmpty()) {
271 polledHandles.clear();
272 }
273
274 for (int i = 0; i < rv; i++) {
275 long flag = polledSockets[i];
276 long socket = polledSockets[++i];
277 if (socket == wakeupSocket) {
278 synchronized (wakeupLock) {
279 Poll.remove(pollset, wakeupSocket);
280 toBeWakenUp = false;
281 }
282 continue;
283 }
284
285 if ((flag & Poll.APR_POLLIN) != 0) {
286 polledHandles.add(socket);
287 }
288 }
289 return polledHandles.size();
290 }
291 }
292
293
294
295
296 @Override
297 protected Iterator<Long> selectedHandles() {
298 return polledHandles.iterator();
299 }
300
301
302
303
304 @Override
305 protected void close(Long handle) throws Exception {
306 Poll.remove(pollset, handle);
307 int result = Socket.close(handle);
308 if (result != Status.APR_SUCCESS) {
309 throwException(result);
310 }
311 }
312
313
314
315
316 @Override
317 protected void wakeup() {
318 if (toBeWakenUp) {
319 return;
320 }
321
322
323 synchronized (wakeupLock) {
324 toBeWakenUp = true;
325 Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
326 }
327 }
328
329
330
331
332 @Override
333 public InetSocketAddress getLocalAddress() {
334 return (InetSocketAddress) super.getLocalAddress();
335 }
336
337
338
339
340 @Override
341 public InetSocketAddress getDefaultLocalAddress() {
342 return (InetSocketAddress) super.getDefaultLocalAddress();
343 }
344
345
346
347
348 public void setDefaultLocalAddress(InetSocketAddress localAddress) {
349 super.setDefaultLocalAddress(localAddress);
350 }
351
352
353
354
355 public TransportMetadata getTransportMetadata() {
356 return AprSocketSession.METADATA;
357 }
358
359
360
361
362
363
364 private void throwException(int code) throws IOException {
365 throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
366 }
367 }