View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.apr;
21  
22  import java.io.IOException;
23  import java.nio.ByteBuffer;
24  import java.util.HashMap;
25  import java.util.Iterator;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.concurrent.Executor;
29  
30  import org.apache.mina.core.RuntimeIoException;
31  import org.apache.mina.core.buffer.IoBuffer;
32  import org.apache.mina.core.file.FileRegion;
33  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
34  import org.apache.mina.core.session.SessionState;
35  import org.apache.mina.util.CircularQueue;
36  import org.apache.tomcat.jni.Poll;
37  import org.apache.tomcat.jni.Pool;
38  import org.apache.tomcat.jni.Socket;
39  import org.apache.tomcat.jni.Status;
40  
41  /**
42   * The class in charge of processing socket level IO events for the {@link AprSocketConnector}
43   *
44   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
45   */
46  public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
47      private static final int POLLSET_SIZE = 1024;
48  
49      private final Map<Long, AprSession> allSessions =
50          new HashMap<Long, AprSession>(POLLSET_SIZE);
51  
52      private final Object wakeupLock = new Object();
53      private final long wakeupSocket;
54      private volatile boolean toBeWakenUp;
55  
56      private final long pool;
57      private final long bufferPool; // memory pool
58      private final long pollset; // socket poller
59      private final long[] polledSockets = new long[POLLSET_SIZE << 1];
60      private final List<AprSession> polledSessions =
61          new CircularQueue<AprSession>(POLLSET_SIZE);
62  
63      /**
64       * Create a new instance of {@link AprIoProcessor} with a given Exector for 
65       * handling I/Os events.
66       * 
67       * @param executor the {@link Executor} for handling I/O events
68       */
69      public AprIoProcessor(Executor executor) {
70          super(executor);
71  
72          // initialize a memory pool for APR functions
73          pool = Pool.create(AprLibrary.getInstance().getRootPool());
74          bufferPool = Pool.create(AprLibrary.getInstance().getRootPool());
75  
76          try {
77              wakeupSocket = Socket.create(
78                      Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
79          } catch (RuntimeException e) {
80              throw e;
81          } catch (Error e) {
82              throw e;
83          } catch (Exception e) {
84              throw new RuntimeIoException("Failed to create a wakeup socket.", e);
85          }
86  
87          boolean success = false;
88          long newPollset;
89          try {
90              newPollset = Poll.create(
91                      POLLSET_SIZE,
92                      pool,
93                      Poll.APR_POLLSET_THREADSAFE,
94                      Long.MAX_VALUE);
95  
96              if (newPollset == 0) {
97                  newPollset = Poll.create(
98                          62,
99                          pool,
100                         Poll.APR_POLLSET_THREADSAFE,
101                         Long.MAX_VALUE);
102             }
103 
104             pollset = newPollset;
105             if (pollset < 0) {
106                 if (Status.APR_STATUS_IS_ENOTIMPL(- (int) pollset)) {
107                     throw new RuntimeIoException(
108                             "Thread-safe pollset is not supported in this platform.");
109                 }
110             }
111             success = true;
112         } catch (RuntimeException e) {
113             throw e;
114         } catch (Error e) {
115             throw e;
116         } catch (Exception e) {
117             throw new RuntimeIoException("Failed to create a pollset.", e);
118         } finally {
119             if (!success) {
120                 dispose();
121             }
122         }
123     }
124 
125     /**
126      * {@inheritDoc}
127      */
128     @Override
129     protected void dispose0() {
130         Poll.destroy(pollset);
131         Socket.close(wakeupSocket);
132         Pool.destroy(bufferPool);
133         Pool.destroy(pool);
134     }
135 
136     /**
137      * {@inheritDoc}
138      */
139     @Override
140     protected int select() throws Exception {
141         return select(Integer.MAX_VALUE);
142     }
143 
144     /**
145      * {@inheritDoc}
146      */
147      @Override
148     protected int select(long timeout) throws Exception {
149         int rv = Poll.poll(pollset, 1000 * timeout, polledSockets, false);
150         if (rv <= 0) {
151             if (rv != -120001) {
152                 throwException(rv);
153             }
154 
155             rv = Poll.maintain(pollset, polledSockets, true);
156             if (rv > 0) {
157                 for (int i = 0; i < rv; i ++) {
158                     long socket = polledSockets[i];
159                     AprSession session = allSessions.get(socket);
160                     if (session == null) {
161                         continue;
162                     }
163 
164                     int flag = (session.isInterestedInRead()? Poll.APR_POLLIN : 0) |
165                                (session.isInterestedInWrite()? Poll.APR_POLLOUT : 0);
166 
167                     Poll.add(pollset, socket, flag);
168                 }
169             } else if (rv < 0) {
170                 throwException(rv);
171             }
172 
173             return 0;
174         } else {
175             rv <<= 1;
176             if (!polledSessions.isEmpty()) {
177                 polledSessions.clear();
178             }
179             for (int i = 0; i < rv; i ++) {
180                 long flag = polledSockets[i];
181                 long socket = polledSockets[++i];
182                 if (socket == wakeupSocket) {
183                     synchronized (wakeupLock) {
184                         Poll.remove(pollset, wakeupSocket);
185                         toBeWakenUp = false;
186                     }
187                     continue;
188                 }
189                 AprSession session = allSessions.get(socket);
190                 if (session == null) {
191                     continue;
192                 }
193 
194                 session.setReadable((flag & Poll.APR_POLLIN) != 0);
195                 session.setWritable((flag & Poll.APR_POLLOUT) != 0);
196 
197                 polledSessions.add(session);
198             }
199 
200             return polledSessions.size();
201         }
202     }
203 
204      /**
205      * {@inheritDoc}
206      */
207     @Override
208     protected boolean isSelectorEmpty() {
209         return allSessions.isEmpty();
210     }
211 
212     /**
213      * {@inheritDoc}
214      */
215     @Override
216     protected void wakeup() {
217         if (toBeWakenUp) {
218             return;
219         }
220 
221         // Add a dummy socket to the pollset.
222         synchronized (wakeupLock) {
223             toBeWakenUp = true;
224             Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
225         }
226     }
227 
228     /**
229      * {@inheritDoc}
230      */
231     @Override
232     protected Iterator<AprSession> allSessions() {
233         return allSessions.values().iterator();
234     }
235 
236     /**
237      * {@inheritDoc}
238      */
239     @Override
240     protected Iterator<AprSession> selectedSessions() {
241         return polledSessions.iterator();
242     }
243 
244     @Override
245     protected void init(AprSession session) throws Exception {
246         long s = session.getDescriptor();
247         Socket.optSet(s, Socket.APR_SO_NONBLOCK, 1);
248         Socket.timeoutSet(s, 0);
249 
250         int rv = Poll.add(pollset, s, Poll.APR_POLLIN);
251         if (rv != Status.APR_SUCCESS) {
252             throwException(rv);
253         }
254 
255         session.setInterestedInRead(true);
256         allSessions.put(s, session);
257     }
258 
259     /**
260      * {@inheritDoc}
261      */
262     @Override
263     protected void destroy(AprSession session) throws Exception {
264         if (allSessions.remove(session.getDescriptor()) == null) {
265             // Already destroyed.
266             return;
267         }
268 
269         int ret = Poll.remove(pollset, session.getDescriptor());
270         try {
271             if (ret != Status.APR_SUCCESS) {
272                 throwException(ret);
273             }
274         } finally {
275             ret = Socket.close(session.getDescriptor());
276             
277             // destroying the session because it won't be reused 
278             // after this point
279             Socket.destroy(session.getDescriptor());
280             session.setDescriptor(0);
281             
282             if (ret != Status.APR_SUCCESS) {
283                 throwException(ret);
284             }
285         }
286     }
287 
288     /**
289      * {@inheritDoc}
290      */
291     @Override
292     protected SessionState getState(AprSession session) {
293         long socket = session.getDescriptor();
294         
295         if (socket != 0) {
296             return SessionState.OPENED;
297         } else if (allSessions.get(socket) != null) {
298             return SessionState.OPENING; // will occur ?
299         } else {
300             return SessionState.CLOSING;
301         }
302     }
303 
304     /**
305      * {@inheritDoc}
306      */
307     @Override
308     protected boolean isReadable(AprSession session) {
309         return session.isReadable();
310     }
311 
312     /**
313      * {@inheritDoc}
314      */
315     @Override
316     protected boolean isWritable(AprSession session) {
317         return session.isWritable();
318     }
319 
320     /**
321      * {@inheritDoc}
322      */
323     @Override
324     protected boolean isInterestedInRead(AprSession session) {
325         return session.isInterestedInRead();
326     }
327 
328     /**
329      * {@inheritDoc}
330      */
331     @Override
332     protected boolean isInterestedInWrite(AprSession session) {
333         return session.isInterestedInWrite();
334     }
335 
336     /**
337      * {@inheritDoc}
338      */
339     @Override
340     protected void setInterestedInRead(AprSession session, boolean isInterested) throws Exception {
341         if (session.isInterestedInRead() == isInterested) {
342             return;
343         }
344 
345         int rv = Poll.remove(pollset, session.getDescriptor());
346         if (rv != Status.APR_SUCCESS) {
347             throwException(rv);
348         }
349 
350         int flags = (isInterested ? Poll.APR_POLLIN : 0)
351                 | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
352 
353         rv = Poll.add(pollset, session.getDescriptor(), flags);
354         if (rv == Status.APR_SUCCESS) {
355             session.setInterestedInRead(isInterested);
356         } else {
357             throwException(rv);
358         }
359     }
360 
361     /**
362      * {@inheritDoc}
363      */
364     @Override
365     protected void setInterestedInWrite(AprSession session, boolean isInterested) throws Exception {
366         if (session.isInterestedInWrite() == isInterested) {
367             return;
368         }
369 
370         int rv = Poll.remove(pollset, session.getDescriptor());
371         if (rv != Status.APR_SUCCESS) {
372             throwException(rv);
373         }
374 
375         int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
376                 | (isInterested ? Poll.APR_POLLOUT : 0);
377 
378         rv = Poll.add(pollset, session.getDescriptor(), flags);
379         if (rv == Status.APR_SUCCESS) {
380             session.setInterestedInWrite(isInterested);
381         } else {
382             throwException(rv);
383         }
384     }
385 
386     /**
387      * {@inheritDoc}
388      */
389     @Override
390     protected int read(AprSession session, IoBuffer buffer) throws Exception {
391         int bytes;
392         int capacity = buffer.remaining();
393         // Using Socket.recv() directly causes memory leak. :-(
394         ByteBuffer b = Pool.alloc(bufferPool, capacity);
395         
396         try {
397             bytes = Socket.recvb(
398                     session.getDescriptor(), b, 0, capacity);
399             
400             if (bytes > 0) {
401                 b.position(0);
402                 b.limit(bytes);
403                 buffer.put(b);
404             } else if (bytes < 0) {
405                 if (Status.APR_STATUS_IS_EOF(-bytes)) {
406                     bytes = -1;
407                 } else if (Status.APR_STATUS_IS_EAGAIN(-bytes)) {
408                     bytes = 0;
409                 } else {
410                     throwException(bytes);
411                 }
412             }
413         } finally {
414             Pool.clear(bufferPool);
415         }
416         
417         return bytes;
418     }
419 
420     /**
421      * {@inheritDoc}
422      */
423     @Override
424     protected int write(AprSession session, IoBuffer buf, int length) throws Exception {
425         int writtenBytes;
426         if (buf.isDirect()) {
427             writtenBytes = Socket.sendb(
428                     session.getDescriptor(), buf.buf(), buf.position(), length);
429         } else {
430             writtenBytes = Socket.send(
431                     session.getDescriptor(), buf.array(), buf.position(), length);
432             if (writtenBytes > 0) {
433                 buf.skip(writtenBytes);
434             }
435         }
436 
437         if (writtenBytes < 0) {
438             if (Status.APR_STATUS_IS_EAGAIN(-writtenBytes)) {
439                 writtenBytes = 0;
440             } else if (Status.APR_STATUS_IS_EOF(-writtenBytes)) {
441                 writtenBytes = 0;
442             } else {
443                 throwException(writtenBytes);
444             }
445         }
446         return writtenBytes;
447     }
448 
449     /**
450      * {@inheritDoc}
451      */
452     @Override
453     protected int transferFile(AprSession session, FileRegion region, int length)
454             throws Exception {
455         throw new UnsupportedOperationException();
456     }
457 
458     private void throwException(int code) throws IOException {
459         throw new IOException(
460                 org.apache.tomcat.jni.Error.strerror(-code) +
461                 " (code: " + code + ")");
462     }
463 }