View Javadoc

1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core.future;
21  
22  import java.util.ArrayList;
23  import java.util.List;
24  import java.util.concurrent.TimeUnit;
25  
26  import org.apache.mina.core.polling.AbstractPollingIoProcessor;
27  import org.apache.mina.core.service.IoProcessor;
28  import org.apache.mina.core.session.IoSession;
29  import org.apache.mina.util.ExceptionMonitor;
30  
31  /**
32   * A default implementation of {@link IoFuture} associated with
33   * an {@link IoSession}.
34   * 
35   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
36   */
37  public class DefaultIoFuture implements IoFuture {
38  
39      /** A number of seconds to wait between two deadlock controls ( 5 seconds ) */
40      private static final long DEAD_LOCK_CHECK_INTERVAL = 5000L;
41  
42      /** The associated session */
43      private final IoSession session;
44  
45      /** A lock used by the wait() method */
46      private final Object lock;
47  
48      private IoFutureListener<?> firstListener;
49  
50      private List<IoFutureListener<?>> otherListeners;
51  
52      private Object result;
53  
54      private boolean ready;
55  
56      private int waiters;
57  
58      /**
59       * Creates a new instance associated with an {@link IoSession}.
60       *
61       * @param session an {@link IoSession} which is associated with this future
62       */
63      public DefaultIoFuture(IoSession session) {
64          this.session = session;
65          this.lock = this;
66      }
67  
68      /**
69       * {@inheritDoc}
70       */
71      public IoSession getSession() {
72          return session;
73      }
74  
75      /**
76       * @deprecated Replaced with {@link #awaitUninterruptibly()}.
77       */
78      @Deprecated
79      public void join() {
80          awaitUninterruptibly();
81      }
82  
83      /**
84       * @deprecated Replaced with {@link #awaitUninterruptibly(long)}.
85       */
86      @Deprecated
87      public boolean join(long timeoutMillis) {
88          return awaitUninterruptibly(timeoutMillis);
89      }
90  
91      /**
92       * {@inheritDoc}
93       */
94      public IoFuture await() throws InterruptedException {
95          synchronized (lock) {
96              while (!ready) {
97                  waiters++;
98                  try {
99                      // Wait for a notify, or if no notify is called,
100                     // assume that we have a deadlock and exit the 
101                     // loop to check for a potential deadlock.
102                     lock.wait(DEAD_LOCK_CHECK_INTERVAL);
103                 } finally {
104                     waiters--;
105                     if (!ready) {
106                         checkDeadLock();
107                     }
108                 }
109             }
110         }
111         return this;
112     }
113 
114     /**
115      * {@inheritDoc}
116      */
117     public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
118         return await(unit.toMillis(timeout));
119     }
120 
121     /**
122      * {@inheritDoc}
123      */
124     public boolean await(long timeoutMillis) throws InterruptedException {
125         return await0(timeoutMillis, true);
126     }
127 
128     /**
129      * {@inheritDoc}
130      */
131     public IoFuture awaitUninterruptibly() {
132         try {
133             await0(Long.MAX_VALUE, false);
134         } catch (InterruptedException ie) {
135             // Do nothing : this catch is just mandatory by contract
136         }
137 
138         return this;
139     }
140 
141     /**
142      * {@inheritDoc}
143      */
144     public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
145         return awaitUninterruptibly(unit.toMillis(timeout));
146     }
147 
148     /**
149      * {@inheritDoc}
150      */
151     public boolean awaitUninterruptibly(long timeoutMillis) {
152         try {
153             return await0(timeoutMillis, false);
154         } catch (InterruptedException e) {
155             throw new InternalError();
156         }
157     }
158 
159     /**
160      * Wait for the Future to be ready. If the requested delay is 0 or 
161      * negative, this method immediately returns the value of the 
162      * 'ready' flag. 
163      * Every 5 second, the wait will be suspended to be able to check if 
164      * there is a deadlock or not.
165      * 
166      * @param timeoutMillis The delay we will wait for the Future to be ready
167      * @param interruptable Tells if the wait can be interrupted or not
168      * @return <code>true</code> if the Future is ready
169      * @throws InterruptedException If the thread has been interrupted
170      * when it's not allowed.
171      */
172     private boolean await0(long timeoutMillis, boolean interruptable) throws InterruptedException {
173         long endTime = System.currentTimeMillis() + timeoutMillis;
174 
175         if (endTime < 0) {
176             endTime = Long.MAX_VALUE;
177         }
178 
179         synchronized (lock) {
180             if (ready) {
181                 return ready;
182             } else if (timeoutMillis <= 0) {
183                 return ready;
184             }
185 
186             waiters++;
187 
188             try {
189                 for (;;) {
190                     try {
191                         long timeOut = Math.min(timeoutMillis, DEAD_LOCK_CHECK_INTERVAL);
192                         lock.wait(timeOut);
193                     } catch (InterruptedException e) {
194                         if (interruptable) {
195                             throw e;
196                         }
197                     }
198 
199                     if (ready) {
200                         return true;
201                     }
202 
203                     if (endTime < System.currentTimeMillis()) {
204                         return ready;
205                     }
206                 }
207             } finally {
208                 waiters--;
209                 if (!ready) {
210                     checkDeadLock();
211                 }
212             }
213         }
214     }
215 
216     /**
217      * 
218      * TODO checkDeadLock.
219      *
220      */
221     private void checkDeadLock() {
222         // Only read / write / connect / write future can cause dead lock. 
223         if (!(this instanceof CloseFuture || this instanceof WriteFuture || this instanceof ReadFuture || this instanceof ConnectFuture)) {
224             return;
225         }
226 
227         // Get the current thread stackTrace. 
228         // Using Thread.currentThread().getStackTrace() is the best solution,
229         // even if slightly less efficient than doing a new Exception().getStackTrace(),
230         // as internally, it does exactly the same thing. The advantage of using
231         // this solution is that we may benefit some improvement with some
232         // future versions of Java.
233         StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
234 
235         // Simple and quick check.
236         for (StackTraceElement s : stackTrace) {
237             if (AbstractPollingIoProcessor.class.getName().equals(s.getClassName())) {
238                 IllegalStateException e = new IllegalStateException("t");
239                 e.getStackTrace();
240                 throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName()
241                         + ".await() was invoked from an I/O processor thread.  " + "Please use "
242                         + IoFutureListener.class.getSimpleName() + " or configure a proper thread model alternatively.");
243             }
244         }
245 
246         // And then more precisely.
247         for (StackTraceElement s : stackTrace) {
248             try {
249                 Class<?> cls = DefaultIoFuture.class.getClassLoader().loadClass(s.getClassName());
250                 if (IoProcessor.class.isAssignableFrom(cls)) {
251                     throw new IllegalStateException("DEAD LOCK: " + IoFuture.class.getSimpleName()
252                             + ".await() was invoked from an I/O processor thread.  " + "Please use "
253                             + IoFutureListener.class.getSimpleName()
254                             + " or configure a proper thread model alternatively.");
255                 }
256             } catch (Exception cnfe) {
257                 // Ignore
258             }
259         }
260     }
261 
262     /**
263      * {@inheritDoc}
264      */
265     public boolean isDone() {
266         synchronized (lock) {
267             return ready;
268         }
269     }
270 
271     /**
272      * Sets the result of the asynchronous operation, and mark it as finished.
273      */
274     public void setValue(Object newValue) {
275         synchronized (lock) {
276             // Allow only once.
277             if (ready) {
278                 return;
279             }
280 
281             result = newValue;
282             ready = true;
283             if (waiters > 0) {
284                 lock.notifyAll();
285             }
286         }
287 
288         notifyListeners();
289     }
290 
291     /**
292      * Returns the result of the asynchronous operation.
293      */
294     protected Object getValue() {
295         synchronized (lock) {
296             return result;
297         }
298     }
299 
300     /**
301      * {@inheritDoc}
302      */
303     public IoFuture addListener(IoFutureListener<?> listener) {
304         if (listener == null) {
305             throw new IllegalArgumentException("listener");
306         }
307 
308         boolean notifyNow = false;
309         synchronized (lock) {
310             if (ready) {
311                 notifyNow = true;
312             } else {
313                 if (firstListener == null) {
314                     firstListener = listener;
315                 } else {
316                     if (otherListeners == null) {
317                         otherListeners = new ArrayList<IoFutureListener<?>>(1);
318                     }
319                     otherListeners.add(listener);
320                 }
321             }
322         }
323 
324         if (notifyNow) {
325             notifyListener(listener);
326         }
327         return this;
328     }
329 
330     /**
331      * {@inheritDoc}
332      */
333     public IoFuture removeListener(IoFutureListener<?> listener) {
334         if (listener == null) {
335             throw new IllegalArgumentException("listener");
336         }
337 
338         synchronized (lock) {
339             if (!ready) {
340                 if (listener == firstListener) {
341                     if (otherListeners != null && !otherListeners.isEmpty()) {
342                         firstListener = otherListeners.remove(0);
343                     } else {
344                         firstListener = null;
345                     }
346                 } else if (otherListeners != null) {
347                     otherListeners.remove(listener);
348                 }
349             }
350         }
351 
352         return this;
353     }
354 
355     private void notifyListeners() {
356         // There won't be any visibility problem or concurrent modification
357         // because 'ready' flag will be checked against both addListener and
358         // removeListener calls.
359         if (firstListener != null) {
360             notifyListener(firstListener);
361             firstListener = null;
362 
363             if (otherListeners != null) {
364                 for (IoFutureListener<?> l : otherListeners) {
365                     notifyListener(l);
366                 }
367                 otherListeners = null;
368             }
369         }
370     }
371 
372     @SuppressWarnings("unchecked")
373     private void notifyListener(IoFutureListener l) {
374         try {
375             l.operationComplete(this);
376         } catch (Throwable t) {
377             ExceptionMonitor.getInstance().exceptionCaught(t);
378         }
379     }
380 }