View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.apr;
21  
22  import java.io.IOException;
23  import java.net.InetSocketAddress;
24  import java.net.SocketAddress;
25  import java.util.Iterator;
26  import java.util.Queue;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.Executor;
29  
30  import org.apache.mina.core.RuntimeIoException;
31  import org.apache.mina.core.polling.AbstractPollingIoAcceptor;
32  import org.apache.mina.core.service.IoAcceptor;
33  import org.apache.mina.core.service.IoProcessor;
34  import org.apache.mina.core.service.IoService;
35  import org.apache.mina.core.service.SimpleIoProcessorPool;
36  import org.apache.mina.core.service.TransportMetadata;
37  import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
38  import org.apache.tomcat.jni.Address;
39  import org.apache.tomcat.jni.Poll;
40  import org.apache.tomcat.jni.Pool;
41  import org.apache.tomcat.jni.Socket;
42  import org.apache.tomcat.jni.Status;
43  
44  /**
45   * {@link IoAcceptor} for APR based socket transport (TCP/IP).
46   *
47   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
48   */
49  public final class AprSocketAcceptor extends AbstractPollingIoAcceptor<AprSession, Long> {
50      /** 
51       * This constant is deduced from the APR code. It is used when the timeout
52       * has expired while doing a poll() operation.
53       */
54      private static final int APR_TIMEUP_ERROR = -120001;
55  
56      private static final int POLLSET_SIZE = 1024;
57  
58      private final Object wakeupLock = new Object();
59  
60      private volatile long wakeupSocket;
61  
62      private volatile boolean toBeWakenUp;
63  
64      private volatile long pool;
65  
66      private volatile long pollset; // socket poller
67  
68      private final long[] polledSockets = new long[POLLSET_SIZE << 1];
69  
70      private final Queue<Long> polledHandles = new ConcurrentLinkedQueue<Long>();
71  
72      /**
73       * Constructor for {@link AprSocketAcceptor} using default parameters (multiple thread model).
74       */
75      public AprSocketAcceptor() {
76          super(new DefaultSocketSessionConfig(), AprIoProcessor.class);
77          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
78      }
79  
80      /**
81       * Constructor for {@link AprSocketAcceptor} using default parameters, and 
82       * given number of {@link AprIoProcessor} for multithreading I/O operations.
83       * 
84       * @param processorCount the number of processor to create and place in a
85       * {@link SimpleIoProcessorPool} 
86       */
87      public AprSocketAcceptor(int processorCount) {
88          super(new DefaultSocketSessionConfig(), AprIoProcessor.class, processorCount);
89          ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
90      }
91  
92      /**
93       *  Constructor for {@link AprSocketAcceptor} with default configuration but a
94        *  specific {@link AprIoProcessor}, useful for sharing the same processor over multiple
95        *  {@link IoService} of the same type.
96        * @param processor the processor to use for managing I/O events
97        */
98      public AprSocketAcceptor(IoProcessor<AprSession> processor) {
99          super(new DefaultSocketSessionConfig(), processor);
100         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
101     }
102 
103     /**
104      *  Constructor for {@link AprSocketAcceptor} with a given {@link Executor} for handling 
105      *  connection events and a given {@link AprIoProcessor} for handling I/O events, useful for 
106      *  sharing the same processor and executor over multiple {@link IoService} of the same type.
107      * @param executor the executor for connection
108      * @param processor the processor for I/O operations
109      */
110     public AprSocketAcceptor(Executor executor, IoProcessor<AprSession> processor) {
111         super(new DefaultSocketSessionConfig(), executor, processor);
112         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
113     }
114 
115     /**
116      * {@inheritDoc}
117      */
118     @Override
119     protected AprSession accept(IoProcessor<AprSession> processor, Long handle) throws Exception {
120         long s = Socket.accept(handle);
121         boolean success = false;
122         try {
123             AprSession result = new AprSocketSession(this, processor, s);
124             success = true;
125             return result;
126         } finally {
127             if (!success) {
128                 Socket.close(s);
129             }
130         }
131     }
132 
133     /**
134      * {@inheritDoc}
135      */
136     @Override
137     protected Long open(SocketAddress localAddress) throws Exception {
138         InetSocketAddress la = (InetSocketAddress) localAddress;
139         long handle = Socket.create(Socket.APR_INET, Socket.SOCK_STREAM, Socket.APR_PROTO_TCP, pool);
140 
141         boolean success = false;
142         try {
143             int result = Socket.optSet(handle, Socket.APR_SO_NONBLOCK, 1);
144             if (result != Status.APR_SUCCESS) {
145                 throwException(result);
146             }
147             result = Socket.timeoutSet(handle, 0);
148             if (result != Status.APR_SUCCESS) {
149                 throwException(result);
150             }
151 
152             // Configure the server socket,
153             result = Socket.optSet(handle, Socket.APR_SO_REUSEADDR, isReuseAddress() ? 1 : 0);
154             if (result != Status.APR_SUCCESS) {
155                 throwException(result);
156             }
157             result = Socket.optSet(handle, Socket.APR_SO_RCVBUF, getSessionConfig().getReceiveBufferSize());
158             if (result != Status.APR_SUCCESS) {
159                 throwException(result);
160             }
161 
162             // and bind.
163             long sa;
164             if (la != null) {
165                 if (la.getAddress() == null) {
166                     sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, la.getPort(), 0, pool);
167                 } else {
168                     sa = Address.info(la.getAddress().getHostAddress(), Socket.APR_INET, la.getPort(), 0, pool);
169                 }
170             } else {
171                 sa = Address.info(Address.APR_ANYADDR, Socket.APR_INET, 0, 0, pool);
172             }
173 
174             result = Socket.bind(handle, sa);
175             if (result != Status.APR_SUCCESS) {
176                 throwException(result);
177             }
178             result = Socket.listen(handle, getBacklog());
179             if (result != Status.APR_SUCCESS) {
180                 throwException(result);
181             }
182 
183             result = Poll.add(pollset, handle, Poll.APR_POLLIN);
184             if (result != Status.APR_SUCCESS) {
185                 throwException(result);
186             }
187             success = true;
188         } finally {
189             if (!success) {
190                 close(handle);
191             }
192         }
193         return handle;
194     }
195 
196     /**
197      * {@inheritDoc}
198      */
199     @Override
200     protected void init() throws Exception {
201         // initialize a memory pool for APR functions
202         pool = Pool.create(AprLibrary.getInstance().getRootPool());
203 
204         wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
205 
206         pollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
207 
208         if (pollset <= 0) {
209             pollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
210         }
211 
212         if (pollset <= 0) {
213             if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
214                 throw new RuntimeIoException("Thread-safe pollset is not supported in this platform.");
215             }
216         }
217     }
218 
219     /**
220      * {@inheritDoc}
221      */
222     @Override
223     protected void destroy() throws Exception {
224         if (wakeupSocket > 0) {
225             Socket.close(wakeupSocket);
226         }
227         if (pollset > 0) {
228             Poll.destroy(pollset);
229         }
230         if (pool > 0) {
231             Pool.destroy(pool);
232         }
233     }
234 
235     /**
236      * {@inheritDoc}
237      */
238     @Override
239     protected SocketAddress localAddress(Long handle) throws Exception {
240         long la = Address.get(Socket.APR_LOCAL, handle);
241         return new InetSocketAddress(Address.getip(la), Address.getInfo(la).port);
242     }
243 
244     /**
245      * {@inheritDoc}
246      */
247     @Override
248     protected int select() throws Exception {
249         int rv = Poll.poll(pollset, Integer.MAX_VALUE, polledSockets, false);
250         if (rv <= 0) {
251             // We have had an error. It can simply be that we have reached
252             // the timeout (very unlikely, as we have set it to MAX_INTEGER)
253             if (rv != APR_TIMEUP_ERROR) {
254                 // It's not a timeout being exceeded. Throw the error
255                 throwException(rv);
256             }
257 
258             rv = Poll.maintain(pollset, polledSockets, true);
259             if (rv > 0) {
260                 for (int i = 0; i < rv; i++) {
261                     Poll.add(pollset, polledSockets[i], Poll.APR_POLLIN);
262                 }
263             } else if (rv < 0) {
264                 throwException(rv);
265             }
266 
267             return 0;
268         } else {
269             rv <<= 1;
270             if (!polledHandles.isEmpty()) {
271                 polledHandles.clear();
272             }
273 
274             for (int i = 0; i < rv; i++) {
275                 long flag = polledSockets[i];
276                 long socket = polledSockets[++i];
277                 if (socket == wakeupSocket) {
278                     synchronized (wakeupLock) {
279                         Poll.remove(pollset, wakeupSocket);
280                         toBeWakenUp = false;
281                     }
282                     continue;
283                 }
284 
285                 if ((flag & Poll.APR_POLLIN) != 0) {
286                     polledHandles.add(socket);
287                 }
288             }
289             return polledHandles.size();
290         }
291     }
292 
293     /**
294      * {@inheritDoc}
295      */
296     @Override
297     protected Iterator<Long> selectedHandles() {
298         return polledHandles.iterator();
299     }
300 
301     /**
302      * {@inheritDoc}
303      */
304     @Override
305     protected void close(Long handle) throws Exception {
306         Poll.remove(pollset, handle);
307         int result = Socket.close(handle);
308         if (result != Status.APR_SUCCESS) {
309             throwException(result);
310         }
311     }
312 
313     /**
314      * {@inheritDoc}
315      */
316     @Override
317     protected void wakeup() {
318         if (toBeWakenUp) {
319             return;
320         }
321 
322         // Add a dummy socket to the pollset.
323         synchronized (wakeupLock) {
324             toBeWakenUp = true;
325             Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
326         }
327     }
328 
329     /**
330      * {@inheritDoc}
331      */
332     @Override
333     public InetSocketAddress getLocalAddress() {
334         return (InetSocketAddress) super.getLocalAddress();
335     }
336 
337     /**
338      * {@inheritDoc}
339      */
340     @Override
341     public InetSocketAddress getDefaultLocalAddress() {
342         return (InetSocketAddress) super.getDefaultLocalAddress();
343     }
344 
345     /**
346      * {@inheritDoc}
347      */
348     public void setDefaultLocalAddress(InetSocketAddress localAddress) {
349         super.setDefaultLocalAddress(localAddress);
350     }
351 
352     /**
353      * {@inheritDoc}
354      */
355     public TransportMetadata getTransportMetadata() {
356         return AprSocketSession.METADATA;
357     }
358 
359     /**
360      * Convert an APR code into an Exception with the corresponding message
361      * @param code error number
362      * @throws IOException the generated exception
363      */
364     private void throwException(int code) throws IOException {
365         throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
366     }
367 }