001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: *
019: */
020: package org.apache.mina.common;
021:
022: import java.util.ArrayList;
023: import java.util.List;
024: import java.util.concurrent.TimeUnit;
025:
026: /**
027: * A default implementation of {@link IoFuture}.
028: *
029: * @author The Apache MINA Project (dev@mina.apache.org)
030: * @version $Rev: 600818 $, $Date: 2007-12-04 00:37:24 -0700 (Tue, 04 Dec 2007) $
031: */
032: public class DefaultIoFuture implements IoFuture {
033:
034: private static final int DEAD_LOCK_CHECK_INTERVAL = 5000;
035:
036: private final IoSession session;
037: private final Object lock;
038: private IoFutureListener<?> firstListener;
039: private List<IoFutureListener<?>> otherListeners;
040: private Object result;
041: private boolean ready;
042: private int waiters;
043:
044: /**
045: * Creates a new instance.
046: *
047: * @param session an {@link IoSession} which is associated with this future
048: */
049: public DefaultIoFuture(IoSession session) {
050: this .session = session;
051: this .lock = this ;
052: }
053:
054: public IoSession getSession() {
055: return session;
056: }
057:
058: public void join() {
059: awaitUninterruptibly();
060: }
061:
062: public boolean join(long timeoutMillis) {
063: return awaitUninterruptibly(timeoutMillis);
064: }
065:
066: public IoFuture await() throws InterruptedException {
067: synchronized (lock) {
068: while (!ready) {
069: waiters++;
070: try {
071: lock.wait(DEAD_LOCK_CHECK_INTERVAL);
072: checkDeadLock();
073: } finally {
074: waiters--;
075: }
076: }
077: }
078: return this ;
079: }
080:
081: public boolean await(long timeout, TimeUnit unit)
082: throws InterruptedException {
083: return await(unit.toMillis(timeout));
084: }
085:
086: public boolean await(long timeoutMillis)
087: throws InterruptedException {
088: return await0(timeoutMillis, true);
089: }
090:
091: public IoFuture awaitUninterruptibly() {
092: synchronized (lock) {
093: while (!ready) {
094: waiters++;
095: try {
096: lock.wait(DEAD_LOCK_CHECK_INTERVAL);
097: } catch (InterruptedException e) {
098: } finally {
099: waiters--;
100: if (!ready) {
101: checkDeadLock();
102: }
103: }
104: }
105: }
106:
107: return this ;
108: }
109:
110: public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
111: return awaitUninterruptibly(unit.toMillis(timeout));
112: }
113:
114: public boolean awaitUninterruptibly(long timeoutMillis) {
115: try {
116: return await0(timeoutMillis, false);
117: } catch (InterruptedException e) {
118: throw new InternalError();
119: }
120: }
121:
122: private boolean await0(long timeoutMillis, boolean interruptable)
123: throws InterruptedException {
124: long startTime = timeoutMillis <= 0 ? 0 : System
125: .currentTimeMillis();
126: long waitTime = timeoutMillis;
127:
128: synchronized (lock) {
129: if (ready) {
130: return ready;
131: } else if (waitTime <= 0) {
132: return ready;
133: }
134:
135: waiters++;
136: try {
137: for (;;) {
138: try {
139: lock.wait(Math.min(waitTime,
140: DEAD_LOCK_CHECK_INTERVAL));
141: } catch (InterruptedException e) {
142: if (interruptable) {
143: throw e;
144: }
145: }
146:
147: if (ready) {
148: return true;
149: } else {
150: waitTime = timeoutMillis
151: - (System.currentTimeMillis() - startTime);
152: if (waitTime <= 0) {
153: return ready;
154: }
155: }
156: }
157: } finally {
158: waiters--;
159: if (!ready) {
160: checkDeadLock();
161: }
162: }
163: }
164: }
165:
166: private void checkDeadLock() {
167: // Only read / write / connect / write future can cause dead lock.
168: if (!(this instanceof CloseFuture
169: || this instanceof WriteFuture
170: || this instanceof ReadFuture || this instanceof ConnectFuture)) {
171: return;
172: }
173:
174: IllegalStateException e = new IllegalStateException(
175: "DEAD LOCK: "
176: + IoFuture.class.getSimpleName()
177: + ".await() was invoked from an I/O processor thread. "
178: + "Please use "
179: + IoFutureListener.class.getSimpleName()
180: + " or configure a proper thread model alternatively.");
181:
182: StackTraceElement[] stackTrace = e.getStackTrace();
183:
184: // Simple and quick check.
185: for (StackTraceElement s : stackTrace) {
186: if (AbstractPollingIoProcessor.class.getName().equals(
187: s.getClassName())) {
188: throw e;
189: }
190: }
191:
192: // And then more precisely.
193: for (StackTraceElement s : stackTrace) {
194: try {
195: Class<?> cls = DefaultIoFuture.class.getClassLoader()
196: .loadClass(s.getClassName());
197: if (IoProcessor.class.isAssignableFrom(cls)) {
198: throw e;
199: }
200: } catch (Exception cnfe) {
201: // Ignore
202: }
203: }
204: }
205:
206: public boolean isReady() {
207: synchronized (lock) {
208: return ready;
209: }
210: }
211:
212: /**
213: * Sets the result of the asynchronous operation, and mark it as finished.
214: */
215: protected void setValue(Object newValue) {
216: synchronized (lock) {
217: // Allow only once.
218: if (ready) {
219: return;
220: }
221:
222: result = newValue;
223: ready = true;
224: if (waiters > 0) {
225: lock.notifyAll();
226: }
227: }
228:
229: notifyListeners();
230: }
231:
232: /**
233: * Returns the result of the asynchronous operation.
234: */
235: protected Object getValue() {
236: synchronized (lock) {
237: return result;
238: }
239: }
240:
241: public IoFuture addListener(IoFutureListener<?> listener) {
242: if (listener == null) {
243: throw new NullPointerException("listener");
244: }
245:
246: boolean notifyNow = false;
247: synchronized (lock) {
248: if (ready) {
249: notifyNow = true;
250: } else {
251: if (firstListener == null) {
252: firstListener = listener;
253: } else {
254: if (otherListeners == null) {
255: otherListeners = new ArrayList<IoFutureListener<?>>(
256: 1);
257: }
258: otherListeners.add(listener);
259: }
260: }
261: }
262:
263: if (notifyNow) {
264: notifyListener(listener);
265: }
266: return this ;
267: }
268:
269: public IoFuture removeListener(IoFutureListener<?> listener) {
270: if (listener == null) {
271: throw new NullPointerException("listener");
272: }
273:
274: synchronized (lock) {
275: if (!ready) {
276: if (listener == firstListener) {
277: if (otherListeners != null
278: && !otherListeners.isEmpty()) {
279: firstListener = otherListeners.remove(0);
280: } else {
281: firstListener = null;
282: }
283: } else if (otherListeners != null) {
284: otherListeners.remove(listener);
285: }
286: }
287: }
288:
289: return this ;
290: }
291:
292: private void notifyListeners() {
293: // There won't be any visibility problem or concurrent modification
294: // because 'ready' flag will be checked against both addListener and
295: // removeListener calls.
296: if (firstListener != null) {
297: notifyListener(firstListener);
298: firstListener = null;
299:
300: if (otherListeners != null) {
301: for (IoFutureListener<?> l : otherListeners) {
302: notifyListener(l);
303: }
304: otherListeners = null;
305: }
306: }
307: }
308:
309: @SuppressWarnings("unchecked")
310: private void notifyListener(IoFutureListener l) {
311: try {
312: l.operationComplete(this );
313: } catch (Throwable t) {
314: ExceptionMonitor.getInstance().exceptionCaught(t);
315: }
316: }
317: }
|