View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.buffer;
21  
22  import java.io.BufferedOutputStream;
23  import java.util.concurrent.ConcurrentHashMap;
24  
25  import org.apache.mina.core.buffer.IoBuffer;
26  import org.apache.mina.core.filterchain.IoFilter;
27  import org.apache.mina.core.filterchain.IoFilterAdapter;
28  import org.apache.mina.core.session.IoSession;
29  import org.apache.mina.core.write.DefaultWriteRequest;
30  import org.apache.mina.core.write.WriteRequest;
31  import org.apache.mina.filter.codec.ProtocolCodecFilter;
32  import org.apache.mina.util.LazyInitializedCacheMap;
33  import org.slf4j.Logger;
34  import org.slf4j.LoggerFactory;
35  
36  /**
37   * An {@link IoFilter} implementation used to buffer outgoing {@link WriteRequest} almost
38   * like what {@link BufferedOutputStream} does. Using this filter allows to be less dependent
39   * from network latency. It is also useful when a session is generating very small messages
40   * too frequently and consequently generating unnecessary traffic overhead.
41   * 
42   * Please note that it should always be placed before the {@link ProtocolCodecFilter}
43   * as it only handles {@link WriteRequest}'s carrying {@link IoBuffer} objects.
44   * 
45   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
46   * @since MINA 2.0.0-M2
47   * @org.apache.xbean.XBean
48   */
49  public final class BufferedWriteFilter extends IoFilterAdapter {
50      private final Logger logger = LoggerFactory.getLogger(BufferedWriteFilter.class);
51  
52      /**
53       * Default buffer size value in bytes.
54       */
55      public final static int DEFAULT_BUFFER_SIZE = 8192;
56  
57      /**
58       * The buffer size allocated for each new session's buffer.
59       */
60      private int bufferSize = DEFAULT_BUFFER_SIZE;
61  
62      /**
63       * The map that matches an {@link IoSession} and it's {@link IoBuffer}
64       * buffer.
65       */
66      private final LazyInitializedCacheMap<IoSession, IoBuffer> buffersMap;
67  
68      /**
69       * Default constructor. Sets buffer size to {@link #DEFAULT_BUFFER_SIZE}
70       * bytes. Uses a default instance of {@link ConcurrentHashMap}.
71       */
72      public BufferedWriteFilter() {
73          this(DEFAULT_BUFFER_SIZE, null);
74      }
75  
76      /**
77       * Constructor which sets buffer size to <code>bufferSize</code>.Uses a default
78       * instance of {@link ConcurrentHashMap}.
79       * 
80       * @param bufferSize the new buffer size
81       */
82      public BufferedWriteFilter(int bufferSize) {
83          this(bufferSize, null);
84      }
85  
86      /**
87       * Constructor which sets buffer size to <code>bufferSize</code>. If
88       * <code>buffersMap</code> is null then a default instance of {@link ConcurrentHashMap}
89       * is created else the provided instance is used.
90       * 
91       * @param bufferSize the new buffer size
92       * @param buffersMap the map to use for storing each session buffer
93       */
94      public BufferedWriteFilter(int bufferSize, LazyInitializedCacheMap<IoSession, IoBuffer> buffersMap) {
95          super();
96          this.bufferSize = bufferSize;
97          if (buffersMap == null) {
98              this.buffersMap = new LazyInitializedCacheMap<IoSession, IoBuffer>();
99          } else {
100             this.buffersMap = buffersMap;
101         }
102     }
103 
104     /**
105      * Returns buffer size.
106      */
107     public int getBufferSize() {
108         return bufferSize;
109     }
110 
111     /**
112      * Sets the buffer size but only for the newly created buffers.
113      * 
114      * @param bufferSize the new buffer size
115      */
116     public void setBufferSize(int bufferSize) {
117         this.bufferSize = bufferSize;
118     }
119 
120     /**
121      * {@inheritDoc}
122      * 
123      * @throws Exception if <code>writeRequest.message</code> isn't an
124      *                   {@link IoBuffer} instance.
125      */
126     @Override
127     public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
128 
129         Object data = writeRequest.getMessage();
130 
131         if (data instanceof IoBuffer) {
132             write(session, (IoBuffer) data);
133         } else {
134             throw new IllegalArgumentException("This filter should only buffer IoBuffer objects");
135         }
136     }
137 
138     /**
139      * Writes an {@link IoBuffer} to the session's buffer.
140      * 
141      * @param session the session to which a write is requested
142      * @param data the data to buffer
143      */
144     private void write(IoSession session, IoBuffer data) {
145         IoBuffer dest = buffersMap.putIfAbsent(session, new IoBufferLazyInitializer(bufferSize));
146 
147         write(session, data, dest);
148     }
149 
150     /**
151      * Writes <code>data</code> {@link IoBuffer} to the <code>buf</code>
152      * {@link IoBuffer} which buffers write requests for the
153      * <code>session</code> {@ link IoSession} until buffer is full
154      * or manually flushed.
155      * 
156      * @param session the session where buffer will be written
157      * @param data the data to buffer
158      * @param buf the buffer where data will be temporarily written
159      */
160     private void write(IoSession session, IoBuffer data, IoBuffer buf) {
161         try {
162             int len = data.remaining();
163             if (len >= buf.capacity()) {
164                 /*
165                  * If the request length exceeds the size of the output buffer,
166                  * flush the output buffer and then write the data directly.
167                  */
168                 NextFilter nextFilter = session.getFilterChain().getNextFilter(this);
169                 internalFlush(nextFilter, session, buf);
170                 nextFilter.filterWrite(session, new DefaultWriteRequest(data));
171                 return;
172             }
173             if (len > (buf.limit() - buf.position())) {
174                 internalFlush(session.getFilterChain().getNextFilter(this), session, buf);
175             }
176             synchronized (buf) {
177                 buf.put(data);
178             }
179         } catch (Exception e) {
180             session.getFilterChain().fireExceptionCaught(e);
181         }
182     }
183 
184     /**
185      * Internal method that actually flushes the buffered data.
186      * 
187      * @param nextFilter the {@link NextFilter} of this filter
188      * @param session the session where buffer will be written
189      * @param buf the data to write
190      * @throws Exception if a write operation fails
191      */
192     private void internalFlush(NextFilter nextFilter, IoSession session, IoBuffer buf) throws Exception {
193         IoBuffer tmp = null;
194         synchronized (buf) {
195             buf.flip();
196             tmp = buf.duplicate();
197             buf.clear();
198         }
199         logger.debug("Flushing buffer: {}", tmp);
200         nextFilter.filterWrite(session, new DefaultWriteRequest(tmp));
201     }
202 
203     /**
204      * Flushes the buffered data.
205      * 
206      * @param session the session where buffer will be written
207      */
208     public void flush(IoSession session) {
209         try {
210             internalFlush(session.getFilterChain().getNextFilter(this), session, buffersMap.get(session));
211         } catch (Exception e) {
212             session.getFilterChain().fireExceptionCaught(e);
213         }
214     }
215 
216     /**
217      * Internal method that actually frees the {@link IoBuffer} that contains
218      * the buffered data that has not been flushed.
219      * 
220      * @param session the session we operate on
221      */
222     private void free(IoSession session) {
223         IoBuffer buf = buffersMap.remove(session);
224         if (buf != null) {
225             buf.free();
226         }
227     }
228 
229     /**
230      * {@inheritDoc}
231      */
232     @Override
233     public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception {
234         free(session);
235         nextFilter.exceptionCaught(session, cause);
236     }
237 
238     /**
239      * {@inheritDoc}
240      */
241     @Override
242     public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
243         free(session);
244         nextFilter.sessionClosed(session);
245     }
246 }