View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.filter.reqres;
21  
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  import java.util.Iterator;
25  import java.util.LinkedHashSet;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Set;
29  import java.util.concurrent.ConcurrentHashMap;
30  import java.util.concurrent.ScheduledExecutorService;
31  import java.util.concurrent.ScheduledFuture;
32  import java.util.concurrent.TimeUnit;
33  
34  import org.apache.mina.core.filterchain.IoFilterChain;
35  import org.apache.mina.core.session.AttributeKey;
36  import org.apache.mina.core.session.IoSession;
37  import org.apache.mina.core.write.WriteRequest;
38  import org.apache.mina.filter.util.WriteRequestFilter;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  /**
43   * TODO Add documentation
44   * 
45   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
46   * @org.apache.xbean.XBean
47   */
48  public class RequestResponseFilter extends WriteRequestFilter {
49  
50      private final AttributeKey RESPONSE_INSPECTOR = new AttributeKey(getClass(), "responseInspector");
51  
52      private final AttributeKey REQUEST_STORE = new AttributeKey(getClass(), "requestStore");
53  
54      private final AttributeKey UNRESPONDED_REQUEST_STORE = new AttributeKey(getClass(), "unrespondedRequestStore");
55  
56      private final ResponseInspectorFactory responseInspectorFactory;
57  
58      private final ScheduledExecutorService timeoutScheduler;
59  
60      private final static Logger LOGGER = LoggerFactory.getLogger(RequestResponseFilter.class);
61  
62      public RequestResponseFilter(final ResponseInspector responseInspector, ScheduledExecutorService timeoutScheduler) {
63          if (responseInspector == null) {
64              throw new IllegalArgumentException("responseInspector");
65          }
66          if (timeoutScheduler == null) {
67              throw new IllegalArgumentException("timeoutScheduler");
68          }
69          this.responseInspectorFactory = new ResponseInspectorFactory() {
70              public ResponseInspector getResponseInspector() {
71                  return responseInspector;
72              }
73          };
74          this.timeoutScheduler = timeoutScheduler;
75      }
76  
77      public RequestResponseFilter(ResponseInspectorFactory responseInspectorFactory,
78              ScheduledExecutorService timeoutScheduler) {
79          if (responseInspectorFactory == null) {
80              throw new IllegalArgumentException("responseInspectorFactory");
81          }
82          if (timeoutScheduler == null) {
83              throw new IllegalArgumentException("timeoutScheduler");
84          }
85          this.responseInspectorFactory = responseInspectorFactory;
86          this.timeoutScheduler = timeoutScheduler;
87      }
88  
89      @Override
90      public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
91          if (parent.contains(this)) {
92              throw new IllegalArgumentException(
93                      "You can't add the same filter instance more than once.  Create another instance and add it.");
94          }
95  
96          IoSession session = parent.getSession();
97          session.setAttribute(RESPONSE_INSPECTOR, responseInspectorFactory.getResponseInspector());
98          session.setAttribute(REQUEST_STORE, createRequestStore(session));
99          session.setAttribute(UNRESPONDED_REQUEST_STORE, createUnrespondedRequestStore(session));
100     }
101 
102     @Override
103     public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception {
104         IoSession session = parent.getSession();
105 
106         destroyUnrespondedRequestStore(getUnrespondedRequestStore(session));
107         destroyRequestStore(getRequestStore(session));
108 
109         session.removeAttribute(UNRESPONDED_REQUEST_STORE);
110         session.removeAttribute(REQUEST_STORE);
111         session.removeAttribute(RESPONSE_INSPECTOR);
112     }
113 
114     @Override
115     public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
116         ResponseInspector responseInspector = (ResponseInspector) session.getAttribute(RESPONSE_INSPECTOR);
117         Object requestId = responseInspector.getRequestId(message);
118         if (requestId == null) {
119             // Not a response message.  Ignore.
120             nextFilter.messageReceived(session, message);
121             return;
122         }
123 
124         // Retrieve (or remove) the corresponding request.
125         ResponseType type = responseInspector.getResponseType(message);
126         if (type == null) {
127             nextFilter.exceptionCaught(session, new IllegalStateException(responseInspector.getClass().getName()
128                     + "#getResponseType() may not return null."));
129         }
130 
131         Map<Object, Request> requestStore = getRequestStore(session);
132 
133         Request request;
134         switch (type) {
135         case WHOLE:
136         case PARTIAL_LAST:
137             synchronized (requestStore) {
138                 request = requestStore.remove(requestId);
139             }
140             break;
141         case PARTIAL:
142             synchronized (requestStore) {
143                 request = requestStore.get(requestId);
144             }
145             break;
146         default:
147             throw new InternalError();
148         }
149 
150         if (request == null) {
151             // A response message without request. Swallow the event because
152             // the response might have arrived too late.
153             if (LOGGER.isWarnEnabled()) {
154                 LOGGER.warn("Unknown request ID '" + requestId + "' for the response message. Timed out already?: "
155                         + message);
156             }
157         } else {
158             // Found a matching request.
159             // Cancel the timeout task if needed.
160             if (type != ResponseType.PARTIAL) {
161                 ScheduledFuture<?> scheduledFuture = request.getTimeoutFuture();
162                 if (scheduledFuture != null) {
163                     scheduledFuture.cancel(false);
164                     Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
165                     synchronized (unrespondedRequests) {
166                         unrespondedRequests.remove(request);
167                     }
168                 }
169             }
170 
171             // And forward the event.
172             Response response = new Response(request, message, type);
173             request.signal(response);
174             nextFilter.messageReceived(session, response);
175         }
176     }
177 
178     @Override
179     protected Object doFilterWrite(final NextFilter nextFilter, IoSession session, WriteRequest writeRequest)
180             throws Exception {
181         Object message = writeRequest.getMessage();
182         if (!(message instanceof Request)) {
183             return null;
184         }
185 
186         final Request request = (Request) message;
187         if (request.getTimeoutFuture() != null) {
188             throw new IllegalArgumentException("Request can not be reused.");
189         }
190 
191         Map<Object, Request> requestStore = getRequestStore(session);
192         Object oldValue = null;
193         Object requestId = request.getId();
194         synchronized (requestStore) {
195             oldValue = requestStore.get(requestId);
196             if (oldValue == null) {
197                 requestStore.put(requestId, request);
198             }
199         }
200         if (oldValue != null) {
201             throw new IllegalStateException("Duplicate request ID: " + request.getId());
202         }
203 
204         // Schedule a task to be executed on timeout.
205         TimeoutTask timeoutTask = new TimeoutTask(nextFilter, request, session);
206         ScheduledFuture<?> timeoutFuture = timeoutScheduler.schedule(timeoutTask, request.getTimeoutMillis(),
207                 TimeUnit.MILLISECONDS);
208         request.setTimeoutTask(timeoutTask);
209         request.setTimeoutFuture(timeoutFuture);
210 
211         // Add the timeout task to the unfinished task set.
212         Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
213         synchronized (unrespondedRequests) {
214             unrespondedRequests.add(request);
215         }
216 
217         return request.getMessage();
218     }
219 
220     @Override
221     public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
222         // Copy the unfinished task set to avoid unnecessary lock acquisition.
223         // Copying will be cheap because there won't be that many requests queued.
224         Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
225         List<Request> unrespondedRequestsCopy;
226         synchronized (unrespondedRequests) {
227             unrespondedRequestsCopy = new ArrayList<Request>(unrespondedRequests);
228             unrespondedRequests.clear();
229         }
230 
231         // Generate timeout artificially.
232         for (Request r : unrespondedRequestsCopy) {
233             if (r.getTimeoutFuture().cancel(false)) {
234                 r.getTimeoutTask().run();
235             }
236         }
237 
238         // Clear the request store just in case we missed something, though it's unlikely.
239         Map<Object, Request> requestStore = getRequestStore(session);
240         synchronized (requestStore) {
241             requestStore.clear();
242         }
243 
244         // Now tell the main subject.
245         nextFilter.sessionClosed(session);
246     }
247 
248     @SuppressWarnings("unchecked")
249     private Map<Object, Request> getRequestStore(IoSession session) {
250         return (Map<Object, Request>) session.getAttribute(REQUEST_STORE);
251     }
252 
253     @SuppressWarnings("unchecked")
254     private Set<Request> getUnrespondedRequestStore(IoSession session) {
255         return (Set<Request>) session.getAttribute(UNRESPONDED_REQUEST_STORE);
256     }
257 
258     /**
259      * Returns a {@link Map} which stores {@code messageId}-{@link Request}
260      * pairs whose {@link Response}s are not received yet.  Please override
261      * this method if you need to use other {@link Map} implementation
262      * than the default one ({@link HashMap}).
263      */
264     protected Map<Object, Request> createRequestStore(IoSession session) {
265         return new ConcurrentHashMap<Object, Request>();
266     }
267 
268     /**
269      * Returns a {@link Set} which stores {@link Request} whose
270      * {@link Response}s are not received yet. Please override
271      * this method if you need to use other {@link Set} implementation
272      * than the default one ({@link LinkedHashSet}).  Please note that
273      * the {@link Iterator} of the returned {@link Set} have to iterate
274      * its elements in the insertion order to ensure that
275      * {@link RequestTimeoutException}s are thrown in the order which
276      * {@link Request}s were written.  If you don't need to guarantee
277      * the order of thrown exceptions, any {@link Set} implementation
278      * can be used.
279      */
280     protected Set<Request> createUnrespondedRequestStore(IoSession session) {
281         return new LinkedHashSet<Request>();
282     }
283 
284     /**
285      * Releases any resources related with the {@link Map} created by
286      * {@link #createRequestStore(IoSession)}.  This method is useful
287      * if you override {@link #createRequestStore(IoSession)}.
288      *
289      * @param requestStore what you returned in {@link #createRequestStore(IoSession)}
290      */
291     protected void destroyRequestStore(Map<Object, Request> requestStore) {
292         // Do nothing
293     }
294 
295     /**
296      * Releases any resources related with the {@link Set} created by
297      * {@link #createUnrespondedRequestStore(IoSession)}.  This method is
298      * useful if you override {@link #createUnrespondedRequestStore(IoSession)}.
299      *
300      * @param unrespondedRequestStore what you returned in {@link #createUnrespondedRequestStore(IoSession)}
301      */
302     protected void destroyUnrespondedRequestStore(Set<Request> unrespondedRequestStore) {
303         // Do nothing
304     }
305 
306     private class TimeoutTask implements Runnable {
307         private final NextFilter filter;
308 
309         private final Request request;
310 
311         private final IoSession session;
312 
313         private TimeoutTask(NextFilter filter, Request request, IoSession session) {
314             this.filter = filter;
315             this.request = request;
316             this.session = session;
317         }
318 
319         public void run() {
320             Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
321             if (unrespondedRequests != null) {
322                 synchronized (unrespondedRequests) {
323                     unrespondedRequests.remove(request);
324                 }
325             }
326 
327             Map<Object, Request> requestStore = getRequestStore(session);
328             Object requestId = request.getId();
329             boolean timedOut;
330             synchronized (requestStore) {
331                 if (requestStore.get(requestId) == request) {
332                     requestStore.remove(requestId);
333                     timedOut = true;
334                 } else {
335                     timedOut = false;
336                 }
337             }
338 
339             if (timedOut) {
340                 // Throw the exception only when it's really timed out.
341                 RequestTimeoutException e = new RequestTimeoutException(request);
342                 request.signal(e);
343                 filter.exceptionCaught(session, e);
344             }
345         }
346     }
347 }