1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 *
19 */
20 package org.apache.mina.filter.buffer;
21
22 import java.io.BufferedOutputStream;
23 import java.util.concurrent.ConcurrentHashMap;
24
25 import org.apache.mina.core.buffer.IoBuffer;
26 import org.apache.mina.core.filterchain.IoFilter;
27 import org.apache.mina.core.filterchain.IoFilterAdapter;
28 import org.apache.mina.core.session.IoSession;
29 import org.apache.mina.core.write.DefaultWriteRequest;
30 import org.apache.mina.core.write.WriteRequest;
31 import org.apache.mina.filter.codec.ProtocolCodecFilter;
32 import org.apache.mina.util.LazyInitializedCacheMap;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 /**
37 * An {@link IoFilter} implementation used to buffer outgoing {@link WriteRequest} almost
38 * like what {@link BufferedOutputStream} does. Using this filter allows to be less dependent
39 * from network latency. It is also useful when a session is generating very small messages
40 * too frequently and consequently generating unnecessary traffic overhead.
41 *
42 * Please note that it should always be placed before the {@link ProtocolCodecFilter}
43 * as it only handles {@link WriteRequest}'s carrying {@link IoBuffer} objects.
44 *
45 * @author <a href="http://mina.apache.org">Apache MINA Project</a>
46 * @since MINA 2.0.0-M2
47 * @org.apache.xbean.XBean
48 */
49 public final class BufferedWriteFilter extends IoFilterAdapter {
50 private final Logger logger = LoggerFactory.getLogger(BufferedWriteFilter.class);
51
52 /**
53 * Default buffer size value in bytes.
54 */
55 public final static int DEFAULT_BUFFER_SIZE = 8192;
56
57 /**
58 * The buffer size allocated for each new session's buffer.
59 */
60 private int bufferSize = DEFAULT_BUFFER_SIZE;
61
62 /**
63 * The map that matches an {@link IoSession} and it's {@link IoBuffer}
64 * buffer.
65 */
66 private final LazyInitializedCacheMap<IoSession, IoBuffer> buffersMap;
67
68 /**
69 * Default constructor. Sets buffer size to {@link #DEFAULT_BUFFER_SIZE}
70 * bytes. Uses a default instance of {@link ConcurrentHashMap}.
71 */
72 public BufferedWriteFilter() {
73 this(DEFAULT_BUFFER_SIZE, null);
74 }
75
76 /**
77 * Constructor which sets buffer size to <code>bufferSize</code>.Uses a default
78 * instance of {@link ConcurrentHashMap}.
79 *
80 * @param bufferSize the new buffer size
81 */
82 public BufferedWriteFilter(int bufferSize) {
83 this(bufferSize, null);
84 }
85
86 /**
87 * Constructor which sets buffer size to <code>bufferSize</code>. If
88 * <code>buffersMap</code> is null then a default instance of {@link ConcurrentHashMap}
89 * is created else the provided instance is used.
90 *
91 * @param bufferSize the new buffer size
92 * @param buffersMap the map to use for storing each session buffer
93 */
94 public BufferedWriteFilter(int bufferSize, LazyInitializedCacheMap<IoSession, IoBuffer> buffersMap) {
95 super();
96 this.bufferSize = bufferSize;
97 if (buffersMap == null) {
98 this.buffersMap = new LazyInitializedCacheMap<IoSession, IoBuffer>();
99 } else {
100 this.buffersMap = buffersMap;
101 }
102 }
103
104 /**
105 * Returns buffer size.
106 */
107 public int getBufferSize() {
108 return bufferSize;
109 }
110
111 /**
112 * Sets the buffer size but only for the newly created buffers.
113 *
114 * @param bufferSize the new buffer size
115 */
116 public void setBufferSize(int bufferSize) {
117 this.bufferSize = bufferSize;
118 }
119
120 /**
121 * {@inheritDoc}
122 *
123 * @throws Exception if <code>writeRequest.message</code> isn't an
124 * {@link IoBuffer} instance.
125 */
126 @Override
127 public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception {
128
129 Object data = writeRequest.getMessage();
130
131 if (data instanceof IoBuffer) {
132 write(session, (IoBuffer) data);
133 } else {
134 throw new IllegalArgumentException("This filter should only buffer IoBuffer objects");
135 }
136 }
137
138 /**
139 * Writes an {@link IoBuffer} to the session's buffer.
140 *
141 * @param session the session to which a write is requested
142 * @param data the data to buffer
143 */
144 private void write(IoSession session, IoBuffer data) {
145 IoBuffer dest = buffersMap.putIfAbsent(session, new IoBufferLazyInitializer(bufferSize));
146
147 write(session, data, dest);
148 }
149
150 /**
151 * Writes <code>data</code> {@link IoBuffer} to the <code>buf</code>
152 * {@link IoBuffer} which buffers write requests for the
153 * <code>session</code> {@ link IoSession} until buffer is full
154 * or manually flushed.
155 *
156 * @param session the session where buffer will be written
157 * @param data the data to buffer
158 * @param buf the buffer where data will be temporarily written
159 */
160 private void write(IoSession session, IoBuffer data, IoBuffer buf) {
161 try {
162 int len = data.remaining();
163 if (len >= buf.capacity()) {
164 /*
165 * If the request length exceeds the size of the output buffer,
166 * flush the output buffer and then write the data directly.
167 */
168 NextFilter nextFilter = session.getFilterChain().getNextFilter(this);
169 internalFlush(nextFilter, session, buf);
170 nextFilter.filterWrite(session, new DefaultWriteRequest(data));
171 return;
172 }
173 if (len > (buf.limit() - buf.position())) {
174 internalFlush(session.getFilterChain().getNextFilter(this), session, buf);
175 }
176 synchronized (buf) {
177 buf.put(data);
178 }
179 } catch (Throwable e) {
180 session.getFilterChain().fireExceptionCaught(e);
181 }
182 }
183
184 /**
185 * Internal method that actually flushes the buffered data.
186 *
187 * @param nextFilter the {@link NextFilter} of this filter
188 * @param session the session where buffer will be written
189 * @param buf the data to write
190 * @throws Exception if a write operation fails
191 */
192 private void internalFlush(NextFilter nextFilter, IoSession session, IoBuffer buf) throws Exception {
193 IoBuffer tmp = null;
194 synchronized (buf) {
195 buf.flip();
196 tmp = buf.duplicate();
197 buf.clear();
198 }
199 logger.debug("Flushing buffer: {}", tmp);
200 nextFilter.filterWrite(session, new DefaultWriteRequest(tmp));
201 }
202
203 /**
204 * Flushes the buffered data.
205 *
206 * @param session the session where buffer will be written
207 */
208 public void flush(IoSession session) {
209 try {
210 internalFlush(session.getFilterChain().getNextFilter(this), session, buffersMap.get(session));
211 } catch (Throwable e) {
212 session.getFilterChain().fireExceptionCaught(e);
213 }
214 }
215
216 /**
217 * Internal method that actually frees the {@link IoBuffer} that contains
218 * the buffered data that has not been flushed.
219 *
220 * @param session the session we operate on
221 */
222 private void free(IoSession session) {
223 IoBuffer buf = buffersMap.remove(session);
224 if (buf != null) {
225 buf.free();
226 }
227 }
228
229 /**
230 * {@inheritDoc}
231 */
232 @Override
233 public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception {
234 free(session);
235 nextFilter.exceptionCaught(session, cause);
236 }
237
238 /**
239 * {@inheritDoc}
240 */
241 @Override
242 public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
243 free(session);
244 nextFilter.sessionClosed(session);
245 }
246 }