View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.handler.stream;
21  
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.net.SocketTimeoutException;
26  
27  import org.apache.mina.core.buffer.IoBuffer;
28  import org.apache.mina.core.service.IoHandler;
29  import org.apache.mina.core.service.IoHandlerAdapter;
30  import org.apache.mina.core.session.AttributeKey;
31  import org.apache.mina.core.session.IdleStatus;
32  import org.apache.mina.core.session.IoSession;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  /**
37   * A {@link IoHandler} that adapts asynchronous MINA events to stream I/O.
38   * <p>
39   * Please extend this class and implement
40   * {@link #processStreamIo(IoSession, InputStream, OutputStream)} to
41   * execute your stream I/O logic; <b>please note that you must forward
42   * the process request to other thread or thread pool.</b>
43   *
44   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
45   */
46  public abstract class StreamIoHandler extends IoHandlerAdapter {
47      private final static Logger LOGGER = LoggerFactory.getLogger(StreamIoHandler.class);
48  
49      private static final AttributeKey KEY_IN = new AttributeKey(StreamIoHandler.class, "in");
50  
51      private static final AttributeKey KEY_OUT = new AttributeKey(StreamIoHandler.class, "out");
52  
53      private int readTimeout;
54  
55      private int writeTimeout;
56  
57      protected StreamIoHandler() {
58          // Do nothing
59      }
60  
61      /**
62       * Implement this method to execute your stream I/O logic;
63       * <b>please note that you must forward the process request to other
64       * thread or thread pool.</b>
65       * 
66       * @param session The current session
67       * @param in The input stream
68       * @param out The output stream
69       */
70      protected abstract void processStreamIo(IoSession session, InputStream in, OutputStream out);
71  
72      /**
73       * @return read timeout in seconds.
74       * The default value is <tt>0</tt> (disabled).
75       */
76      public int getReadTimeout() {
77          return readTimeout;
78      }
79  
80      /**
81       * Sets read timeout in seconds.
82       * The default value is <tt>0</tt> (disabled).
83       * @param readTimeout The Read timeout
84       */
85      public void setReadTimeout(int readTimeout) {
86          this.readTimeout = readTimeout;
87      }
88  
89      /**
90       * @return write timeout in seconds.
91       * The default value is <tt>0</tt> (disabled).
92       */
93      public int getWriteTimeout() {
94          return writeTimeout;
95      }
96  
97      /**
98       * Sets write timeout in seconds.
99       * The default value is <tt>0</tt> (disabled).
100      * 
101      * @param writeTimeout The Write timeout
102      */
103     public void setWriteTimeout(int writeTimeout) {
104         this.writeTimeout = writeTimeout;
105     }
106 
107     /**
108      * Initializes streams and timeout settings.
109      */
110     @Override
111     public void sessionOpened(IoSession session) {
112         // Set timeouts
113         session.getConfig().setWriteTimeout(writeTimeout);
114         session.getConfig().setIdleTime(IdleStatus.READER_IDLE, readTimeout);
115 
116         // Create streams
117         InputStream in = new IoSessionInputStream();
118         OutputStream out = new IoSessionOutputStream(session);
119         session.setAttribute(KEY_IN, in);
120         session.setAttribute(KEY_OUT, out);
121         processStreamIo(session, in, out);
122     }
123 
124     /**
125      * Closes streams
126      */
127     @Override
128     public void sessionClosed(IoSession session) throws Exception {
129         final InputStream in = (InputStream) session.getAttribute(KEY_IN);
130         final OutputStream out = (OutputStream) session.getAttribute(KEY_OUT);
131         try {
132             in.close();
133         } finally {
134             out.close();
135         }
136     }
137 
138     /**
139      * Forwards read data to input stream.
140      */
141     @Override
142     public void messageReceived(IoSession session, Object buf) {
143         final IoSessionInputStream in = (IoSessionInputStream) session.getAttribute(KEY_IN);
144         in.write((IoBuffer) buf);
145     }
146 
147     /**
148      * Forwards caught exceptions to input stream.
149      */
150     @Override
151     public void exceptionCaught(IoSession session, Throwable cause) {
152         final IoSessionInputStream in = (IoSessionInputStream) session.getAttribute(KEY_IN);
153 
154         IOException e = null;
155         if (cause instanceof StreamIoException) {
156             e = (IOException) cause.getCause();
157         } else if (cause instanceof IOException) {
158             e = (IOException) cause;
159         }
160 
161         if (e != null && in != null) {
162             in.throwException(e);
163         } else {
164             LOGGER.warn("Unexpected exception.", cause);
165             session.close(true);
166         }
167     }
168 
169     /**
170      * Handles read timeout.
171      */
172     @Override
173     public void sessionIdle(IoSession session, IdleStatus status) {
174         if (status == IdleStatus.READER_IDLE) {
175             throw new StreamIoException(new SocketTimeoutException("Read timeout"));
176         }
177     }
178 
179     private static class StreamIoException extends RuntimeException {
180         private static final long serialVersionUID = 3976736960742503222L;
181 
182         public StreamIoException(IOException cause) {
183             super(cause);
184         }
185     }
186 }