View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.apr;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.concurrent.Executor;
28  
29  import org.apache.mina.core.RuntimeIoException;
30  import org.apache.mina.core.polling.AbstractPollingIoAcceptor;
31  import org.apache.mina.core.service.IoAcceptor;
32  import org.apache.mina.core.service.IoProcessor;
33  import org.apache.mina.core.service.IoService;
34  import org.apache.mina.core.service.SimpleIoProcessorPool;
35  import org.apache.mina.core.service.TransportMetadata;
36  import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
37  import org.apache.mina.transport.socket.SocketAcceptor;
38  import org.apache.mina.transport.socket.SocketSessionConfig;
39  import org.apache.mina.util.CircularQueue;
40  import org.apache.tomcat.jni.Address;
41  import org.apache.tomcat.jni.Poll;
42  import org.apache.tomcat.jni.Pool;
43  import org.apache.tomcat.jni.Socket;
44  import org.apache.tomcat.jni.Status;
45  
46  /**
47   * {@link IoAcceptor} for APR based socket transport (TCP/IP).
48   *
49   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
50   */
51  public final class AprSocketAcceptor extends AbstractPollingIoAcceptor<AprSession, Long> implements SocketAcceptor {
52      /** 
53       * This constant is deduced from the APR code. It is used when the timeout
54       * has expired while doing a poll() operation.
55       */ 
56      private static final int APR_TIMEUP_ERROR = -120001;
57  
58      private static final int POLLSET_SIZE = 1024;
59  
60      private final Object wakeupLock = new Object();
61      private volatile long wakeupSocket;
62      private volatile boolean toBeWakenUp;
63  
64      private int backlog = 50;
65      private boolean reuseAddress = false;
66  
67      private volatile long pool;
68      private volatile long pollset; // socket poller
69      private final long[] polledSockets = new long[POLLSET_SIZE << 1];
70      private final List<Long> polledHandles =
71          new CircularQueue<Long>(POLLSET_SIZE);
72  
73      /**
74       * Constructor for {@link AprSocketAcceptor} using default parameters (multiple thread model).
75       */
76      public AprSocketAcceptor() {
77          super(new DefaultSocketSessionConfig(), AprIoProcessor.class);
78          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
79      }
80  
81      /**
82       * Constructor for {@link AprSocketAcceptor} using default parameters, and 
83       * given number of {@link AprIoProcessor} for multithreading I/O operations.
84       * 
85       * @param processorCount the number of processor to create and place in a
86       * {@link SimpleIoProcessorPool} 
87       */
88      public AprSocketAcceptor(int processorCount) {
89          super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount);
90          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
91      }
92  
93      /**
94       *  Constructor for {@link AprSocketAcceptor} with default configuration but a
95        *  specific {@link AprIoProcessor}, useful for sharing the same processor over multiple
96        *  {@link IoService} of the same type.
97        * @param processor the processor to use for managing I/O events
98        */
99      public AprSocketAcceptor(IoProcessor<AprSession> processor) {
100         super(new DefaultSocketSessionConfig(), processor);
101         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
102     }
103 
104     /**
105      *  Constructor for {@link AprSocketAcceptor} with a given {@link Executor} for handling 
106      *  connection events and a given {@link AprIoProcessor} for handling I/O events, useful for 
107      *  sharing the same processor and executor over multiple {@link IoService} of the same type.
108      * @param executor the executor for connection
109      * @param processor the processor for I/O operations
110      */
111     public AprSocketAcceptor(Executor executor,
112             IoProcessor<AprSession> processor) {
113         super(new DefaultSocketSessionConfig(), executor, processor);
114         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
115     }
116 
117     /**
118      * {@inheritDoc}
119      */
120     @Override
121     protected AprSession accept(IoProcessor<AprSession> processor, Long handle) throws Exception {
122         long s = Socket.accept(handle);
123         boolean success = false;
124         try {
125             AprSession result = new AprSocketSession(this, processor, s);
126             success = true;
127             return result;
128         } finally {
129             if (!success) {
130                 Socket.close(s);
131             }
132         }
133     }
134 
135     /**
136      * {@inheritDoc}
137      */
138     @Override
139     protected Long open(SocketAddress localAddress) throws Exception {
140         InetSocketAddress la = (InetSocketAddress) localAddress;
141         long handle = Socket.create(
142                 Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
143 
144         boolean success = false;
145         try {
146             int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
147             if (result != Status.APR_SUCCESS) {
148                 throwException(result);
149             }
150             result = Socket.timeoutSet(handle, 0);
151             if (result != Status.APR_SUCCESS) {
152                 throwException(result);
153             }
154 
155             // Configure the server socket,
156             result = Socket.optSet(handle, Socket.APR_SO_REUSEADDR, isReuseAddress()? 1 : 0);
157             if (result != Status.APR_SUCCESS) {
158                 throwException(result);
159             }
160             result = Socket.optSet(handle, Socket.APR_SO_RCVBUF, getSessionConfig().getReceiveBufferSize());
161             if (result != Status.APR_SUCCESS) {
162                 throwException(result);
163             }
164 
165             // and bind.
166             long sa;
167             if (la != null) {
168                 if (la.getAddress() == null) {
169                     sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool);
170                 } else {
171                     sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool);
172                 }
173             } else {
174                 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool);
175             }
176 
177             result = Socket.bind(handle, sa);
178             if (result != Status.APR_SUCCESS) {
179                 throwException(result);
180             }
181             result = Socket.listen(handle, getBacklog());
182             if (result != Status.APR_SUCCESS) {
183                 throwException(result);
184             }
185 
186             result = Poll.add(pollset, handle, Poll.APR_POLLIN);
187             if (result != Status.APR_SUCCESS) {
188                 throwException(result);
189             }
190             success = true;
191         } finally {
192             if (!success) {
193                 close(handle);
194             }
195         }
196         return handle;
197     }
198 
199     /**
200      * {@inheritDoc}
201      */
202     @Override
203     protected void init() throws Exception {
204         // initialize a memory pool for APR functions
205         pool = Pool.create(AprLibrary.getInstance().getRootPool());
206 
207         wakeupSocket = Socket.create(
208                 Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
209 
210         pollset = Poll.create(
211                         POLLSET_SIZE,
212                         pool,
213                         Poll.APR_POLLSET_THREADSAFE,
214                         Long.MAX_VALUE);
215 
216         if (pollset <= 0) {
217             pollset = Poll.create(
218                     62,
219                     pool,
220                     Poll.APR_POLLSET_THREADSAFE,
221                     Long.MAX_VALUE);
222         }
223 
224         if (pollset <= 0) {
225             if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
226                 throw new RuntimeIoException(
227                         "Thread-safe pollset is not supported in this platform.");
228             }
229         }
230     }
231 
232     /**
233      * {@inheritDoc}
234      */
235     @Override
236     protected void destroy() throws Exception {
237         if (wakeupSocket > 0) {
238             Socket.close(wakeupSocket);
239         }
240         if (pollset > 0) {
241             Poll.destroy(pollset);
242         }
243         if (pool > 0) {
244             Pool.destroy(pool);
245         }
246     }
247 
248     /**
249      * {@inheritDoc}
250      */
251     @Override
252     protected SocketAddress localAddress(Long handle) throws Exception {
253         long la = Address.get(Socket.APR_LOCAL, handle);
254         return new InetSocketAddress(Address.getip(la), Address.getInfo(la).port);
255     }
256 
257     /**
258      * {@inheritDoc}
259      */
260     @Override
261     protected int select() throws Exception {
262         int rv = Poll.poll(pollset, Integer.MAX_VALUE, polledSockets, false);
263         if (rv <= 0) {
264             // We have had an error. It can simply be that we have reached
265             // the timeout (very unlikely, as we have set it to MAX_INTEGER)
266             if (rv != APR_TIMEUP_ERROR) {
267                 // It's not a timeout being exceeded. Throw the error
268                 throwException(rv);
269             }
270 
271             rv = Poll.maintain(pollset, polledSockets, true);
272             if (rv > 0) {
273                 for (int i = 0; i < rv; i ++) {
274                     Poll.add(pollset, polledSockets[i], Poll.APR_POLLIN);
275                 }
276             } else if (rv < 0) {
277                 throwException(rv);
278             }
279 
280             return 0;
281         } else {
282             rv <<= 1;
283             if (!polledHandles.isEmpty()) {
284                 polledHandles.clear();
285             }
286 
287             for (int i = 0; i < rv; i ++) {
288                 long flag = polledSockets[i];
289                 long socket = polledSockets[++i];
290                 if (socket == wakeupSocket) {
291                     synchronized (wakeupLock) {
292                         Poll.remove(pollset, wakeupSocket);
293                         toBeWakenUp = false;
294                     }
295                     continue;
296                 }
297 
298                 if ((flag & Poll.APR_POLLIN) != 0) {
299                     polledHandles.add(socket);
300                 }
301             }
302             return polledHandles.size();
303         }
304     }
305 
306     /**
307      * {@inheritDoc}
308      */
309     @Override
310     protected Iterator<Long> selectedHandles() {
311         return polledHandles.iterator();
312     }
313 
314     /**
315      * {@inheritDoc}
316      */
317     @Override
318     protected void close(Long handle) throws Exception {
319         Poll.remove(pollset, handle);
320         int result = Socket.close(handle);
321         if (result != Status.APR_SUCCESS) {
322             throwException(result);
323         }
324     }
325 
326     /**
327      * {@inheritDoc}
328      */
329     @Override
330     protected void wakeup() {
331         if (toBeWakenUp) {
332             return;
333         }
334 
335         // Add a dummy socket to the pollset.
336         synchronized (wakeupLock) {
337             toBeWakenUp = true;
338             Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
339         }
340     }
341 
342     /**
343      * {@inheritDoc}
344      */
345     public int getBacklog() {
346         return backlog;
347     }
348 
349     /**
350      * {@inheritDoc}
351      */
352     public boolean isReuseAddress() {
353         return reuseAddress;
354     }
355 
356     /**
357      * {@inheritDoc}
358      */
359     public void setBacklog(int backlog) {
360         synchronized (bindLock) {
361             if (isActive()) {
362                 throw new IllegalStateException(
363                         "backlog can't be set while the acceptor is bound.");
364             }
365 
366             this.backlog = backlog;
367         }
368     }
369 
370     /**
371      * {@inheritDoc}
372      */
373     @Override
374     public InetSocketAddress getLocalAddress() {
375         return (InetSocketAddress) super.getLocalAddress();
376     }
377 
378     /**
379      * {@inheritDoc}
380      */
381     @Override
382     public InetSocketAddress getDefaultLocalAddress() {
383         return (InetSocketAddress) super.getDefaultLocalAddress();
384     }
385 
386     /**
387      * {@inheritDoc}
388      */
389     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
390         super.setDefaultLocalAddress(localAddress);
391     }
392 
393     public void setReuseAddress(boolean reuseAddress) {
394         synchronized (bindLock) {
395             if (isActive()) {
396                 throw new IllegalStateException(
397                         "backlog can't be set while the acceptor is bound.");
398             }
399 
400             this.reuseAddress = reuseAddress;
401         }
402     }
403 
404     /**
405      * {@inheritDoc}
406      */
407     public TransportMetadata getTransportMetadata() {
408         return AprSocketSession.METADATA;
409     }
410 
411     /**
412      * {@inheritDoc}
413      */
414     @Override
415     public SocketSessionConfig getSessionConfig() {
416         return (SocketSessionConfig) super.getSessionConfig();
417     }
418 
419     /**
420      * Convert an APR code into an Exception with the corresponding message
421      * @param code error number
422      * @throws IOException the generated exception
423      */
424     private void throwException(int code) throws IOException {
425         throw new IOException(
426                 org.apache.tomcat.jni.Error.strerror(-code) +
427                 " (code: " + code + ")");
428     }
429 }