001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.common;
021:
022: import java.util.ArrayList;
023: import java.util.Collection;
024: import java.util.Iterator;
025: import java.util.List;
026: import java.util.concurrent.TimeUnit;
027:
028: /**
029: * A utility class that provides various convenience methods related with
030: * {@link IoSession} and {@link IoFuture}.
031: *
032: * @author The Apache MINA Project (dev@mina.apache.org)
033: * @version $Rev: 589474 $, $Date: 2007-10-28 21:03:14 -0600 (Sun, 28 Oct 2007) $
034: */
035: public class IoUtil {
036:
037: private static final IoSession[] EMPTY_SESSIONS = new IoSession[0];
038:
039: /**
040: * Writes the specified {@code message} to the specified {@code sessions}.
041: * If the specified {@code message} is an {@link IoBuffer}, the buffer is
042: * automatically duplicated using {@link IoBuffer#duplicate()}.
043: */
044: public static List<WriteFuture> broadcast(Object message,
045: Collection<IoSession> sessions) {
046: List<WriteFuture> answer = new ArrayList<WriteFuture>(sessions
047: .size());
048: broadcast(message, sessions.iterator(), answer);
049: return answer;
050: }
051:
052: /**
053: * Writes the specified {@code message} to the specified {@code sessions}.
054: * If the specified {@code message} is an {@link IoBuffer}, the buffer is
055: * automatically duplicated using {@link IoBuffer#duplicate()}.
056: */
057: public static List<WriteFuture> broadcast(Object message,
058: Iterable<IoSession> sessions) {
059: List<WriteFuture> answer = new ArrayList<WriteFuture>();
060: broadcast(message, sessions.iterator(), answer);
061: return answer;
062: }
063:
064: /**
065: * Writes the specified {@code message} to the specified {@code sessions}.
066: * If the specified {@code message} is an {@link IoBuffer}, the buffer is
067: * automatically duplicated using {@link IoBuffer#duplicate()}.
068: */
069: public static List<WriteFuture> broadcast(Object message,
070: Iterator<IoSession> sessions) {
071: List<WriteFuture> answer = new ArrayList<WriteFuture>();
072: broadcast(message, sessions, answer);
073: return answer;
074: }
075:
076: /**
077: * Writes the specified {@code message} to the specified {@code sessions}.
078: * If the specified {@code message} is an {@link IoBuffer}, the buffer is
079: * automatically duplicated using {@link IoBuffer#duplicate()}.
080: */
081: public static List<WriteFuture> broadcast(Object message,
082: IoSession... sessions) {
083: if (sessions == null) {
084: sessions = EMPTY_SESSIONS;
085: }
086:
087: List<WriteFuture> answer = new ArrayList<WriteFuture>(
088: sessions.length);
089: if (message instanceof IoBuffer) {
090: for (IoSession s : sessions) {
091: answer.add(s.write(((IoBuffer) message).duplicate()));
092: }
093: } else {
094: for (IoSession s : sessions) {
095: answer.add(s.write(message));
096: }
097: }
098: return answer;
099: }
100:
101: private static void broadcast(Object message,
102: Iterator<IoSession> sessions, Collection<WriteFuture> answer) {
103: if (message instanceof IoBuffer) {
104: while (sessions.hasNext()) {
105: IoSession s = sessions.next();
106: answer.add(s.write(((IoBuffer) message).duplicate()));
107: }
108: } else {
109: while (sessions.hasNext()) {
110: IoSession s = sessions.next();
111: answer.add(s.write(message));
112: }
113: }
114: }
115:
116: public static void await(Iterable<? extends IoFuture> futures)
117: throws InterruptedException {
118: for (IoFuture f : futures) {
119: f.await();
120: }
121: }
122:
123: public static void awaitUninterruptably(
124: Iterable<? extends IoFuture> futures) {
125: for (IoFuture f : futures) {
126: f.awaitUninterruptibly();
127: }
128: }
129:
130: public static boolean await(Iterable<? extends IoFuture> futures,
131: long timeout, TimeUnit unit) throws InterruptedException {
132: return await(futures, unit.toMillis(timeout));
133: }
134:
135: public static boolean await(Iterable<? extends IoFuture> futures,
136: long timeoutMillis) throws InterruptedException {
137: return await0(futures, timeoutMillis, true);
138: }
139:
140: public static boolean awaitUninterruptibly(
141: Iterable<? extends IoFuture> futures, long timeout,
142: TimeUnit unit) {
143: return awaitUninterruptibly(futures, unit.toMillis(timeout));
144: }
145:
146: public static boolean awaitUninterruptibly(
147: Iterable<? extends IoFuture> futures, long timeoutMillis) {
148: try {
149: return await0(futures, timeoutMillis, false);
150: } catch (InterruptedException e) {
151: throw new InternalError();
152: }
153: }
154:
155: private static boolean await0(Iterable<? extends IoFuture> futures,
156: long timeoutMillis, boolean interruptable)
157: throws InterruptedException {
158: long startTime = timeoutMillis <= 0 ? 0 : System
159: .currentTimeMillis();
160: long waitTime = timeoutMillis;
161:
162: boolean lastComplete = true;
163: Iterator<? extends IoFuture> i = futures.iterator();
164: while (i.hasNext()) {
165: IoFuture f = i.next();
166: do {
167: if (interruptable) {
168: lastComplete = f.await(waitTime);
169: } else {
170: lastComplete = f.awaitUninterruptibly(waitTime);
171: }
172:
173: waitTime = timeoutMillis
174: - (System.currentTimeMillis() - startTime);
175:
176: if (lastComplete || waitTime <= 0) {
177: break;
178: }
179: } while (!lastComplete);
180:
181: if (waitTime <= 0) {
182: break;
183: }
184: }
185:
186: return lastComplete && !i.hasNext();
187: }
188:
189: private IoUtil() {
190: }
191: }
|