View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.transport.socket.apr;
21  
22  import java.io.IOException;
23  import java.nio.ByteBuffer;
24  import java.util.HashMap;
25  import java.util.Iterator;
26  import java.util.Map;
27  import java.util.Queue;
28  import java.util.concurrent.ConcurrentLinkedQueue;
29  import java.util.concurrent.Executor;
30  
31  import org.apache.mina.core.RuntimeIoException;
32  import org.apache.mina.core.buffer.IoBuffer;
33  import org.apache.mina.core.file.FileRegion;
34  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
35  import org.apache.mina.core.session.SessionState;
36  import org.apache.tomcat.jni.File;
37  import org.apache.tomcat.jni.Poll;
38  import org.apache.tomcat.jni.Pool;
39  import org.apache.tomcat.jni.Socket;
40  import org.apache.tomcat.jni.Status;
41  
42  /**
43   * The class in charge of processing socket level IO events for the
44   * {@link AprSocketConnector}
45   *
46   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
47   */
48  public final class AprIoProcessor extends AbstractPollingIoProcessor<AprSession> {
49      private static final int POLLSET_SIZE = 1024;
50  
51      private final Map<Long, AprSession> allSessions = new HashMap<Long, AprSession>(POLLSET_SIZE);
52  
53      private final Object wakeupLock = new Object();
54  
55      private final long wakeupSocket;
56  
57      private volatile boolean toBeWakenUp;
58  
59      private final long pool;
60  
61      private final long bufferPool; // memory pool
62  
63      private final long pollset; // socket poller
64  
65      private final long[] polledSockets = new long[POLLSET_SIZE << 1];
66  
67      private final Queue<AprSession> polledSessions = new ConcurrentLinkedQueue<AprSession>();
68  
69      /**
70       * Create a new instance of {@link AprIoProcessor} with a given Exector for
71       * handling I/Os events.
72       *
73       * @param executor
74       *            the {@link Executor} for handling I/O events
75       */
76      public AprIoProcessor(Executor executor) {
77          super(executor);
78  
79          // initialize a memory pool for APR functions
80          pool = Pool.create(AprLibrary.getInstance().getRootPool());
81          bufferPool = Pool.create(AprLibrary.getInstance().getRootPool());
82  
83          try {
84              wakeupSocket = Socket.create(Socket.APR_INET, Socket.SOCK_DGRAM, Socket.APR_PROTO_UDP, pool);
85          } catch (RuntimeException e) {
86              throw e;
87          } catch (Error e) {
88              throw e;
89          } catch (Exception e) {
90              throw new RuntimeIoException("Failed to create a wakeup socket.", e);
91          }
92  
93          boolean success = false;
94          long newPollset;
95          try {
96              newPollset = Poll.create(POLLSET_SIZE, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
97  
98              if (newPollset == 0) {
99                  newPollset = Poll.create(62, pool, Poll.APR_POLLSET_THREADSAFE, Long.MAX_VALUE);
100             }
101 
102             pollset = newPollset;
103             if (pollset < 0) {
104                 if (Status.APR_STATUS_IS_ENOTIMPL(-(int) pollset)) {
105                     throw new RuntimeIoException("Thread-safe pollset is not supported in this platform.");
106                 }
107             }
108             success = true;
109         } catch (RuntimeException e) {
110             throw e;
111         } catch (Error e) {
112             throw e;
113         } catch (Exception e) {
114             throw new RuntimeIoException("Failed to create a pollset.", e);
115         } finally {
116             if (!success) {
117                 dispose();
118             }
119         }
120     }
121 
122     /**
123      * {@inheritDoc}
124      */
125     @Override
126     protected void doDispose() {
127         Poll.destroy(pollset);
128         Socket.close(wakeupSocket);
129         Pool.destroy(bufferPool);
130         Pool.destroy(pool);
131     }
132 
133     /**
134      * {@inheritDoc}
135      */
136     @Override
137     protected int select() throws Exception {
138         return select(Integer.MAX_VALUE);
139     }
140 
141     /**
142      * {@inheritDoc}
143      */
144     @Override
145     protected int select(long timeout) throws Exception {
146         int rv = Poll.poll(pollset, 1000 * timeout, polledSockets, false);
147         if (rv <= 0) {
148             if (rv != -120001) {
149                 throwException(rv);
150             }
151 
152             rv = Poll.maintain(pollset, polledSockets, true);
153             if (rv > 0) {
154                 for (int i = 0; i < rv; i++) {
155                     long socket = polledSockets[i];
156                     AprSession session = allSessions.get(socket);
157                     if (session == null) {
158                         continue;
159                     }
160 
161                     int flag = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0)
162                             | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
163 
164                     Poll.add(pollset, socket, flag);
165                 }
166             } else if (rv < 0) {
167                 throwException(rv);
168             }
169 
170             return 0;
171         } else {
172             rv <<= 1;
173             if (!polledSessions.isEmpty()) {
174                 polledSessions.clear();
175             }
176             for (int i = 0; i < rv; i++) {
177                 long flag = polledSockets[i];
178                 long socket = polledSockets[++i];
179                 if (socket == wakeupSocket) {
180                     synchronized (wakeupLock) {
181                         Poll.remove(pollset, wakeupSocket);
182                         toBeWakenUp = false;
183                         wakeupCalled.set(true);
184                     }
185                     continue;
186                 }
187                 AprSession session = allSessions.get(socket);
188                 if (session == null) {
189                     continue;
190                 }
191 
192                 session.setReadable((flag & Poll.APR_POLLIN) != 0);
193                 session.setWritable((flag & Poll.APR_POLLOUT) != 0);
194 
195                 polledSessions.add(session);
196             }
197 
198             return polledSessions.size();
199         }
200     }
201 
202     /**
203      * {@inheritDoc}
204      */
205     @Override
206     protected boolean isSelectorEmpty() {
207         return allSessions.isEmpty();
208     }
209 
210     /**
211      * {@inheritDoc}
212      */
213     @Override
214     protected void wakeup() {
215         if (toBeWakenUp) {
216             return;
217         }
218 
219         // Add a dummy socket to the pollset.
220         synchronized (wakeupLock) {
221             toBeWakenUp = true;
222             Poll.add(pollset, wakeupSocket, Poll.APR_POLLOUT);
223         }
224     }
225 
226     /**
227      * {@inheritDoc}
228      */
229     @Override
230     protected Iterator<AprSession> allSessions() {
231         return allSessions.values().iterator();
232     }
233 
234     /**
235      * {@inheritDoc}
236      */
237     @Override
238     protected Iterator<AprSession> selectedSessions() {
239         return polledSessions.iterator();
240     }
241 
242     @Override
243     protected void init(AprSession session) throws Exception {
244         long s = session.getDescriptor();
245         Socket.optSet(s, Socket.APR_SO_NONBLOCK, 1);
246         Socket.timeoutSet(s, 0);
247 
248         int rv = Poll.add(pollset, s, Poll.APR_POLLIN);
249         if (rv != Status.APR_SUCCESS) {
250             throwException(rv);
251         }
252 
253         session.setInterestedInRead(true);
254         allSessions.put(s, session);
255     }
256 
257     /**
258      * {@inheritDoc}
259      */
260     @Override
261     protected void destroy(AprSession session) throws Exception {
262         if (allSessions.remove(session.getDescriptor()) == null) {
263             // Already destroyed.
264             return;
265         }
266 
267         int ret = Poll.remove(pollset, session.getDescriptor());
268         try {
269             if (ret != Status.APR_SUCCESS) {
270                 throwException(ret);
271             }
272         } finally {
273             ret = Socket.close(session.getDescriptor());
274 
275             // destroying the session because it won't be reused
276             // after this point
277             Socket.destroy(session.getDescriptor());
278             session.setDescriptor(0);
279 
280             if (ret != Status.APR_SUCCESS) {
281                 throwException(ret);
282             }
283         }
284     }
285 
286     /**
287      * {@inheritDoc}
288      */
289     @Override
290     protected SessionState getState(AprSession session) {
291         long socket = session.getDescriptor();
292 
293         if (socket != 0) {
294             return SessionState.OPENED;
295         } else if (allSessions.get(socket) != null) {
296             return SessionState.OPENING; // will occur ?
297         } else {
298             return SessionState.CLOSING;
299         }
300     }
301 
302     /**
303      * {@inheritDoc}
304      */
305     @Override
306     protected boolean isReadable(AprSession session) {
307         return session.isReadable();
308     }
309 
310     /**
311      * {@inheritDoc}
312      */
313     @Override
314     protected boolean isWritable(AprSession session) {
315         return session.isWritable();
316     }
317 
318     /**
319      * {@inheritDoc}
320      */
321     @Override
322     protected boolean isInterestedInRead(AprSession session) {
323         return session.isInterestedInRead();
324     }
325 
326     /**
327      * {@inheritDoc}
328      */
329     @Override
330     protected boolean isInterestedInWrite(AprSession session) {
331         return session.isInterestedInWrite();
332     }
333 
334     /**
335      * {@inheritDoc}
336      */
337     @Override
338     protected void setInterestedInRead(AprSession session, boolean isInterested) throws Exception {
339         if (session.isInterestedInRead() == isInterested) {
340             return;
341         }
342 
343         int rv = Poll.remove(pollset, session.getDescriptor());
344 
345         if (rv != Status.APR_SUCCESS) {
346             throwException(rv);
347         }
348 
349         int flags = (isInterested ? Poll.APR_POLLIN : 0) | (session.isInterestedInWrite() ? Poll.APR_POLLOUT : 0);
350 
351         rv = Poll.add(pollset, session.getDescriptor(), flags);
352 
353         if (rv == Status.APR_SUCCESS) {
354             session.setInterestedInRead(isInterested);
355         } else {
356             throwException(rv);
357         }
358     }
359 
360     /**
361      * {@inheritDoc}
362      */
363     @Override
364     protected void setInterestedInWrite(AprSession session, boolean isInterested) throws Exception {
365         if (session.isInterestedInWrite() == isInterested) {
366             return;
367         }
368 
369         int rv = Poll.remove(pollset, session.getDescriptor());
370 
371         if (rv != Status.APR_SUCCESS) {
372             throwException(rv);
373         }
374 
375         int flags = (session.isInterestedInRead() ? Poll.APR_POLLIN : 0) | (isInterested ? Poll.APR_POLLOUT : 0);
376 
377         rv = Poll.add(pollset, session.getDescriptor(), flags);
378 
379         if (rv == Status.APR_SUCCESS) {
380             session.setInterestedInWrite(isInterested);
381         } else {
382             throwException(rv);
383         }
384     }
385 
386     /**
387      * {@inheritDoc}
388      */
389     @Override
390     protected int read(AprSession session, IoBuffer buffer) throws Exception {
391         int bytes;
392         int capacity = buffer.remaining();
393         // Using Socket.recv() directly causes memory leak. :-(
394         ByteBuffer b = Pool.alloc(bufferPool, capacity);
395 
396         try {
397             bytes = Socket.recvb(session.getDescriptor(), b, 0, capacity);
398 
399             if (bytes > 0) {
400                 b.position(0);
401                 b.limit(bytes);
402                 buffer.put(b);
403             } else if (bytes < 0) {
404                 if (Status.APR_STATUS_IS_EOF(-bytes)) {
405                     bytes = -1;
406                 } else if (Status.APR_STATUS_IS_EAGAIN(-bytes)) {
407                     bytes = 0;
408                 } else {
409                     throwException(bytes);
410                 }
411             }
412         } finally {
413             Pool.clear(bufferPool);
414         }
415 
416         return bytes;
417     }
418 
419     /**
420      * {@inheritDoc}
421      */
422     @Override
423     protected int write(AprSession session, IoBuffer buf, int length) throws Exception {
424         int writtenBytes;
425         if (buf.isDirect()) {
426             writtenBytes = Socket.sendb(session.getDescriptor(), buf.buf(), buf.position(), length);
427         } else {
428             writtenBytes = Socket.send(session.getDescriptor(), buf.array(), buf.position(), length);
429             if (writtenBytes > 0) {
430                 buf.skip(writtenBytes);
431             }
432         }
433 
434         if (writtenBytes < 0) {
435             if (Status.APR_STATUS_IS_EAGAIN(-writtenBytes)) {
436                 writtenBytes = 0;
437             } else if (Status.APR_STATUS_IS_EOF(-writtenBytes)) {
438                 writtenBytes = 0;
439             } else {
440                 throwException(writtenBytes);
441             }
442         }
443         return writtenBytes;
444     }
445 
446     /**
447      * {@inheritDoc}
448      */
449     @Override
450     protected int transferFile(AprSession session, FileRegion region, int length) throws Exception {
451         if (region.getFilename() == null) {
452             throw new UnsupportedOperationException();
453         }
454 
455         long fd = File.open(region.getFilename(), File.APR_FOPEN_READ | File.APR_FOPEN_SENDFILE_ENABLED
456                 | File.APR_FOPEN_BINARY, 0, Socket.pool(session.getDescriptor()));
457         long numWritten = Socket.sendfilen(session.getDescriptor(), fd, region.getPosition(), length, 0);
458         File.close(fd);
459 
460         if (numWritten < 0) {
461             if (numWritten == -Status.EAGAIN) {
462                 return 0;
463             }
464             throw new IOException(org.apache.tomcat.jni.Error.strerror((int) -numWritten) + " (code: " + numWritten
465                     + ")");
466         }
467         return (int) numWritten;
468     }
469 
470     private void throwException(int code) throws IOException {
471         throw new IOException(org.apache.tomcat.jni.Error.strerror(-code) + " (code: " + code + ")");
472     }
473 
474     /**
475      * {@inheritDoc}
476      */
477     @Override
478     protected void registerNewSelector() {
479         // Do nothing
480     }
481 
482     /**
483      * {@inheritDoc}
484      */
485     @Override
486     protected boolean isBrokenConnection() throws IOException {
487         // Here, we assume that this is the case.
488         return true;
489     }
490 }