View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.handler.stream;
21  
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.io.OutputStream;
25  import java.net.SocketTimeoutException;
26  
27  import org.apache.mina.core.buffer.IoBuffer;
28  import org.apache.mina.core.service.IoHandler;
29  import org.apache.mina.core.service.IoHandlerAdapter;
30  import org.apache.mina.core.session.AttributeKey;
31  import org.apache.mina.core.session.IdleStatus;
32  import org.apache.mina.core.session.IoSession;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  /**
37   * A {@link IoHandler} that adapts asynchronous MINA events to stream I/O.
38   * <p>
39   * Please extend this class and implement
40   * {@link #processStreamIo(IoSession, InputStream, OutputStream)} to
41   * execute your stream I/O logic; <b>please note that you must forward
42   * the process request to other thread or thread pool.</b>
43   *
44   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
45   */
46  public abstract class StreamIoHandler extends IoHandlerAdapter {
47      private final static Logger LOGGER = LoggerFactory.getLogger(StreamIoHandler.class);
48  
49      private static final AttributeKey KEY_IN = new AttributeKey(StreamIoHandler.class, "in");
50  
51      private static final AttributeKey KEY_OUT = new AttributeKey(StreamIoHandler.class, "out");
52  
53      private int readTimeout;
54  
55      private int writeTimeout;
56  
57      protected StreamIoHandler() {
58          // Do nothing
59      }
60  
61      /**
62       * Implement this method to execute your stream I/O logic;
63       * <b>please note that you must forward the process request to other
64       * thread or thread pool.</b>
65       */
66      protected abstract void processStreamIo(IoSession session, InputStream in, OutputStream out);
67  
68      /**
69       * Returns read timeout in seconds.
70       * The default value is <tt>0</tt> (disabled).
71       */
72      public int getReadTimeout() {
73          return readTimeout;
74      }
75  
76      /**
77       * Sets read timeout in seconds.
78       * The default value is <tt>0</tt> (disabled).
79       */
80      public void setReadTimeout(int readTimeout) {
81          this.readTimeout = readTimeout;
82      }
83  
84      /**
85       * Returns write timeout in seconds.
86       * The default value is <tt>0</tt> (disabled).
87       */
88      public int getWriteTimeout() {
89          return writeTimeout;
90      }
91  
92      /**
93       * Sets write timeout in seconds.
94       * The default value is <tt>0</tt> (disabled).
95       */
96      public void setWriteTimeout(int writeTimeout) {
97          this.writeTimeout = writeTimeout;
98      }
99  
100     /**
101      * Initializes streams and timeout settings.
102      */
103     @Override
104     public void sessionOpened(IoSession session) {
105         // Set timeouts
106         session.getConfig().setWriteTimeout(writeTimeout);
107         session.getConfig().setIdleTime(IdleStatus.READER_IDLE, readTimeout);
108 
109         // Create streams
110         InputStream in = new IoSessionInputStream();
111         OutputStream out = new IoSessionOutputStream(session);
112         session.setAttribute(KEY_IN, in);
113         session.setAttribute(KEY_OUT, out);
114         processStreamIo(session, in, out);
115     }
116 
117     /**
118      * Closes streams
119      */
120     @Override
121     public void sessionClosed(IoSession session) throws Exception {
122         final InputStream in = (InputStream) session.getAttribute(KEY_IN);
123         final OutputStream out = (OutputStream) session.getAttribute(KEY_OUT);
124         try {
125             in.close();
126         } finally {
127             out.close();
128         }
129     }
130 
131     /**
132      * Forwards read data to input stream.
133      */
134     @Override
135     public void messageReceived(IoSession session, Object buf) {
136         final IoSessionInputStream in = (IoSessionInputStream) session.getAttribute(KEY_IN);
137         in.write((IoBuffer) buf);
138     }
139 
140     /**
141      * Forwards caught exceptions to input stream.
142      */
143     @Override
144     public void exceptionCaught(IoSession session, Throwable cause) {
145         final IoSessionInputStream in = (IoSessionInputStream) session.getAttribute(KEY_IN);
146 
147         IOException e = null;
148         if (cause instanceof StreamIoException) {
149             e = (IOException) cause.getCause();
150         } else if (cause instanceof IOException) {
151             e = (IOException) cause;
152         }
153 
154         if (e != null && in != null) {
155             in.throwException(e);
156         } else {
157             LOGGER.warn("Unexpected exception.", cause);
158             session.close(true);
159         }
160     }
161 
162     /**
163      * Handles read timeout.
164      */
165     @Override
166     public void sessionIdle(IoSession session, IdleStatus status) {
167         if (status == IdleStatus.READER_IDLE) {
168             throw new StreamIoException(new SocketTimeoutException("Read timeout"));
169         }
170     }
171 
172     private static class StreamIoException extends RuntimeException {
173         private static final long serialVersionUID = 3976736960742503222L;
174 
175         public StreamIoException(IOException cause) {
176             super(cause);
177         }
178     }
179 }