View Javadoc
1   /*
2    *  Licensed to the Apache Software Foundation (ASF) under one
3    *  or more contributor license agreements.  See the NOTICE file
4    *  distributed with this work for additional information
5    *  regarding copyright ownership.  The ASF licenses this file
6    *  to you under the Apache License, Version 2.0 (the
7    *  "License"); you may not use this file except in compliance
8    *  with the License.  You may obtain a copy of the License at
9    *
10   *    http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *  Unless required by applicable law or agreed to in writing,
13   *  software distributed under the License is distributed on an
14   *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *  KIND, either express or implied.  See the License for the
16   *  specific language governing permissions and limitations
17   *  under the License.
18   *
19   */
20  package org.apache.mina.core;
21  
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Iterator;
25  import java.util.List;
26  import java.util.concurrent.TimeUnit;
27  
28  import org.apache.mina.core.buffer.IoBuffer;
29  import org.apache.mina.core.future.IoFuture;
30  import org.apache.mina.core.future.WriteFuture;
31  import org.apache.mina.core.session.IoSession;
32  
33  /**
34   * A utility class that provides various convenience methods related with
35   * {@link IoSession} and {@link IoFuture}.
36   * 
37   * @author <a href="http://mina.apache.org">Apache MINA Project</a>
38   */
39  public final class IoUtil {
40      private static final IoSession[] EMPTY_SESSIONS = new IoSession[0];
41  
42      /**
43       * Writes the specified {@code message} to the specified {@code sessions}.
44       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
45       * automatically duplicated using {@link IoBuffer#duplicate()}.
46       * 
47       * @param message The message to broadcast
48       * @param sessions The sessions that will receive the message
49       * @return The list of WriteFuture created for each broadcasted message
50       */
51      public static List<WriteFuture> broadcast(Object message, Collection<IoSession> sessions) {
52          List<WriteFuture> answer = new ArrayList<WriteFuture>(sessions.size());
53          broadcast(message, sessions.iterator(), answer);
54          return answer;
55      }
56  
57      /**
58       * Writes the specified {@code message} to the specified {@code sessions}.
59       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
60       * automatically duplicated using {@link IoBuffer#duplicate()}.
61       * 
62       * @param message The message to broadcast
63       * @param sessions The sessions that will receive the message
64       * @return The list of WriteFuture created for each broadcasted message
65       */
66      public static List<WriteFuture> broadcast(Object message, Iterable<IoSession> sessions) {
67          List<WriteFuture> answer = new ArrayList<WriteFuture>();
68          broadcast(message, sessions.iterator(), answer);
69          return answer;
70      }
71  
72      /**
73       * Writes the specified {@code message} to the specified {@code sessions}.
74       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
75       * automatically duplicated using {@link IoBuffer#duplicate()}.
76       * 
77       * @param message The message to write
78       * @param sessions The sessions the message has to be written to
79       * @return The list of {@link WriteFuture} for the written messages
80       */
81      public static List<WriteFuture> broadcast(Object message, Iterator<IoSession> sessions) {
82          List<WriteFuture> answer = new ArrayList<WriteFuture>();
83          broadcast(message, sessions, answer);
84          return answer;
85      }
86  
87      /**
88       * Writes the specified {@code message} to the specified {@code sessions}.
89       * If the specified {@code message} is an {@link IoBuffer}, the buffer is
90       * automatically duplicated using {@link IoBuffer#duplicate()}.
91       * 
92       * @param message The message to write
93       * @param sessions The sessions the message has to be written to
94       * @return The list of {@link WriteFuture} for the written messages
95       */
96      public static List<WriteFuture> broadcast(Object message, IoSession... sessions) {
97          if (sessions == null) {
98              sessions = EMPTY_SESSIONS;
99          }
100 
101         List<WriteFuture> answer = new ArrayList<WriteFuture>(sessions.length);
102         if (message instanceof IoBuffer) {
103             for (IoSession s : sessions) {
104                 answer.add(s.write(((IoBuffer) message).duplicate()));
105             }
106         } else {
107             for (IoSession s : sessions) {
108                 answer.add(s.write(message));
109             }
110         }
111         return answer;
112     }
113 
114     private static void broadcast(Object message, Iterator<IoSession> sessions, Collection<WriteFuture> answer) {
115         if (message instanceof IoBuffer) {
116             while (sessions.hasNext()) {
117                 IoSession s = sessions.next();
118                 answer.add(s.write(((IoBuffer) message).duplicate()));
119             }
120         } else {
121             while (sessions.hasNext()) {
122                 IoSession s = sessions.next();
123                 answer.add(s.write(message));
124             }
125         }
126     }
127 
128     public static void await(Iterable<? extends IoFuture> futures) throws InterruptedException {
129         for (IoFuture f : futures) {
130             f.await();
131         }
132     }
133 
134     public static void awaitUninterruptably(Iterable<? extends IoFuture> futures) {
135         for (IoFuture f : futures) {
136             f.awaitUninterruptibly();
137         }
138     }
139 
140     public static boolean await(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit)
141             throws InterruptedException {
142         return await(futures, unit.toMillis(timeout));
143     }
144 
145     public static boolean await(Iterable<? extends IoFuture> futures, long timeoutMillis) throws InterruptedException {
146         return await0(futures, timeoutMillis, true);
147     }
148 
149     public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeout, TimeUnit unit) {
150         return awaitUninterruptibly(futures, unit.toMillis(timeout));
151     }
152 
153     public static boolean awaitUninterruptibly(Iterable<? extends IoFuture> futures, long timeoutMillis) {
154         try {
155             return await0(futures, timeoutMillis, false);
156         } catch (InterruptedException e) {
157             throw new InternalError();
158         }
159     }
160 
161     private static boolean await0(Iterable<? extends IoFuture> futures, long timeoutMillis, boolean interruptable)
162             throws InterruptedException {
163         long startTime = timeoutMillis <= 0 ? 0 : System.currentTimeMillis();
164         long waitTime = timeoutMillis;
165 
166         boolean lastComplete = true;
167         Iterator<? extends IoFuture> i = futures.iterator();
168         while (i.hasNext()) {
169             IoFuture f = i.next();
170             do {
171                 if (interruptable) {
172                     lastComplete = f.await(waitTime);
173                 } else {
174                     lastComplete = f.awaitUninterruptibly(waitTime);
175                 }
176 
177                 waitTime = timeoutMillis - (System.currentTimeMillis() - startTime);
178 
179                 if (lastComplete || waitTime <= 0) {
180                     break;
181                 }
182             } while (!lastComplete);
183 
184             if (waitTime <= 0) {
185                 break;
186             }
187         }
188 
189         return lastComplete && !i.hasNext();
190     }
191 
192     private IoUtil() {
193         // Do nothing
194     }
195 }