View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core;
21  
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.concurrent.TimeUnit;
27  
28  import org.apache.mina.core.buffer.IoBuffer;
29  import org.apache.mina.core.future.IoFuture;
30  import org.apache.mina.core.future.WriteFuture;
31  import org.apache.mina.core.session.IoSession;
32  
33  /**
34   * A utility class that provides various convenience methods related with
35   * {@link IoSession} and {@link IoFuture}.
36   * 
37   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
38   */
39  public class IoUtil {
40  
41      private static final IoSession[] EMPTY_SESSIONS = new IoSession[0];
42  
43      /**
44       * Writes the specified {@code message} to the specified {@code sessions}.
45       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
46       * automatically duplicated using {@link IoBuffer#duplicate()}.
47       */
48      public static List<WriteFuture> broadcast(Object message, Collection<IoSession> sessions) {
49          List<WriteFuture> answer = new ArrayList<WriteFuture>(sessions.size());
50          broadcast(message, sessions.iterator(), answer);
51          return answer;
52      }
53  
54      /**
55       * Writes the specified {@code message} to the specified {@code sessions}.
56       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
57       * automatically duplicated using {@link IoBuffer#duplicate()}.
58       */
59      public static List<WriteFuture> broadcast(Object message, Iterable<IoSession> sessions) {
60          List<WriteFuture> answer = new ArrayList<WriteFuture>();
61          broadcast(message, sessions.iterator(), answer);
62          return answer;
63      }
64  
65      /**
66       * Writes the specified {@code message} to the specified {@code sessions}.
67       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
68       * automatically duplicated using {@link IoBuffer#duplicate()}.
69       */
70      public static List<WriteFuture> broadcast(Object message, Iterator<IoSession> sessions) {
71          List<WriteFuture> answer = new ArrayList<WriteFuture>();
72          broadcast(message, sessions, answer);
73          return answer;
74      }
75  
76      /**
77       * Writes the specified {@code message} to the specified {@code sessions}.
78       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
79       * automatically duplicated using {@link IoBuffer#duplicate()}.
80       */
81      public static List<WriteFuture> broadcast(Object message, IoSession... sessions) {
82          if (sessions == null) {
83              sessions = EMPTY_SESSIONS;
84          }
85  
86          List<WriteFuture> answer = new ArrayList<WriteFuture>(sessions.length);
87          if (message instanceof IoBuffer) {
88              for (IoSession s : sessions) {
89                  answer.add(s.write(((IoBuffer) message).duplicate()));
90              }
91          } else {
92              for (IoSession s : sessions) {
93                  answer.add(s.write(message));
94              }
95          }
96          return answer;
97      }
98  
99      private static void broadcast(Object message, Iterator<IoSession> sessions, Collection<WriteFuture> answer) {
100         if (message instanceof IoBuffer) {
101             while (sessions.hasNext()) {
102                 IoSession s = sessions.next();
103                 answer.add(s.write(((IoBuffer) message).duplicate()));
104             }
105         } else {
106             while (sessions.hasNext()) {
107                 IoSession s = sessions.next();
108                 answer.add(s.write(message));
109             }
110         }
111     }
112 
113     public static void await(Iterable<? extends IoFuture> futures) throws InterruptedException {
114         for (IoFuture f : futures) {
115             f.await();
116         }
117     }
118 
119     public static void awaitUninterruptably(Iterable<? extends IoFuture> futures) {
120         for (IoFuture f : futures) {
121             f.awaitUninterruptibly();
122         }
123     }
124 
125     public static boolean await(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit)
126             throws InterruptedException {
127         return await(futures, unit.toMillis(timeout));
128     }
129 
130     public static boolean await(Iterable<? extends IoFuture> futures, long timeoutMillis) throws InterruptedException {
131         return await0(futures, timeoutMillis, true);
132     }
133 
134     public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit) {
135         return awaitUninterruptibly(futures, unit.toMillis(timeout));
136     }
137 
138     public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeoutMillis) {
139         try {
140             return await0(futures, timeoutMillis, false);
141         } catch (InterruptedException e) {
142             throw new InternalError();
143         }
144     }
145 
146     private static boolean await0(Iterable<? extends IoFuture> futures, long timeoutMillis, boolean interruptable)
147             throws InterruptedException {
148         long startTime = timeoutMillis <= 0 ? 0 : System.currentTimeMillis();
149         long waitTime = timeoutMillis;
150 
151         boolean lastComplete = true;
152         Iterator<? extends IoFuture> i = futures.iterator();
153         while (i.hasNext()) {
154             IoFuture f = i.next();
155             do {
156                 if (interruptable) {
157                     lastComplete = f.await(waitTime);
158                 } else {
159                     lastComplete = f.awaitUninterruptibly(waitTime);
160                 }
161 
162                 waitTime = timeoutMillis - (System.currentTimeMillis() - startTime);
163 
164                 if (lastComplete || waitTime <= 0) {
165                     break;
166                 }
167             } while (!lastComplete);
168 
169             if (waitTime <= 0) {
170                 break;
171             }
172         }
173 
174         return lastComplete && !i.hasNext();
175     }
176 
177     private IoUtil() {
178         // Do nothing
179     }
180 }