001: package org.mortbay.io.nio;
002:
003: import java.io.IOException;
004: import java.nio.channels.ClosedChannelException;
005: import java.nio.channels.SelectionKey;
006: import java.nio.channels.SocketChannel;
007:
008: import org.mortbay.io.Buffer;
009: import org.mortbay.io.Connection;
010: import org.mortbay.io.nio.SelectorManager.SelectSet;
011: import org.mortbay.jetty.EofException;
012: import org.mortbay.jetty.HttpException;
013: import org.mortbay.log.Log;
014: import org.mortbay.thread.Timeout;
015: import org.omg.CORBA.SystemException;
016:
017: /* ------------------------------------------------------------ */
018: /**
019: * An Endpoint that can be scheduled by {@link SelectorManager}.
020: *
021: * @author gregw
022: *
023: */
024: public class SelectChannelEndPoint extends ChannelEndPoint implements
025: Runnable {
026: protected SelectorManager _manager;
027: protected SelectorManager.SelectSet _selectSet;
028: protected boolean _dispatched = false;
029: protected boolean _writable = true;
030: protected SelectionKey _key;
031: protected int _interestOps;
032: protected boolean _readBlocked;
033: protected boolean _writeBlocked;
034: protected Connection _connection;
035:
036: private Timeout.Task _timeoutTask = new IdleTask();
037:
038: /* ------------------------------------------------------------ */
039: public Connection getConnection() {
040: return _connection;
041: }
042:
043: /* ------------------------------------------------------------ */
044: public SelectChannelEndPoint(SocketChannel channel,
045: SelectSet selectSet, SelectionKey key) {
046: super (channel);
047:
048: _manager = selectSet.getManager();
049: _selectSet = selectSet;
050: _connection = _manager.newConnection(channel, this );
051:
052: _manager.endPointOpened(this ); // TODO not here!
053:
054: _key = key;
055: _key.attach(this ); // TODO not here!
056:
057: }
058:
059: /* ------------------------------------------------------------ */
060: /**
061: * Put the endpoint into the dispatched state.
062: * A blocked thread may be woken up by this call, or the endpoint placed in a state ready
063: * for a dispatch to a threadpool.
064: * @param assumeShortDispatch If true, the interested ops are not modified.
065: * @return True if the endpoint should be dispatched to a thread pool.
066: * @throws IOException
067: */
068: public boolean dispatch(boolean assumeShortDispatch)
069: throws IOException {
070: // If threads are blocked on this
071: synchronized (this ) {
072: if (_key == null) {
073: _readBlocked = false;
074: _writeBlocked = false;
075: this .notifyAll();
076: return false;
077: }
078:
079: if (_readBlocked || _writeBlocked) {
080: if (_readBlocked && _key.isReadable())
081: _readBlocked = false;
082: if (_writeBlocked && _key.isWritable())
083: _writeBlocked = false;
084:
085: // wake them up is as good as a dispatched.
086: this .notifyAll();
087:
088: // we are not interested in further selecting
089: _key.interestOps(0);
090: return false;
091: }
092:
093: if (!assumeShortDispatch)
094: _key.interestOps(0);
095:
096: // Otherwise if we are still dispatched
097: if (_dispatched) {
098: // we are not interested in further selecting
099: _key.interestOps(0);
100: return false;
101: }
102:
103: // Remove writeable op
104: if ((_key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE
105: && (_key.interestOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
106: // Remove writeable op
107: _interestOps = _key.interestOps()
108: & ~SelectionKey.OP_WRITE;
109: _key.interestOps(_interestOps);
110: }
111:
112: _dispatched = true;
113: }
114:
115: return true;
116: }
117:
118: /* ------------------------------------------------------------ */
119: public void scheduleIdle() {
120: _selectSet.scheduleIdle(_timeoutTask);
121: }
122:
123: /* ------------------------------------------------------------ */
124: public void cancelIdle() {
125: _selectSet.cancelIdle(_timeoutTask);
126: }
127:
128: /* ------------------------------------------------------------ */
129: protected void idleExpired() {
130: try {
131: close();
132: } catch (IOException e) {
133: Log.ignore(e);
134: }
135: }
136:
137: /* ------------------------------------------------------------ */
138: /**
139: * Called when a dispatched thread is no longer handling the endpoint. The selection key
140: * operations are updated.
141: */
142: public void undispatch() {
143: synchronized (this ) {
144: try {
145: _dispatched = false;
146:
147: if (getChannel().isOpen()) {
148: updateKey();
149: }
150: } catch (Exception e) {
151: // TODO investigate if this actually is a problem?
152: Log.ignore(e);
153: _interestOps = -1;
154: _selectSet.addChange(this );
155: }
156: }
157: }
158:
159: /* ------------------------------------------------------------ */
160: /*
161: */
162: public int flush(Buffer header, Buffer buffer, Buffer trailer)
163: throws IOException {
164: int l = super .flush(header, buffer, trailer);
165: _writable = l > 0;
166: return l;
167: }
168:
169: /* ------------------------------------------------------------ */
170: /*
171: */
172: public int flush(Buffer buffer) throws IOException {
173: int l = super .flush(buffer);
174: _writable = l > 0;
175: return l;
176: }
177:
178: /* ------------------------------------------------------------ */
179: public boolean isOpen() {
180: SelectionKey key = _key;
181: return super .isOpen() && key != null && key.isValid();
182: }
183:
184: /* ------------------------------------------------------------ */
185: /*
186: * Allows thread to block waiting for further events.
187: */
188: public boolean blockReadable(long timeoutMs) throws IOException {
189: synchronized (this ) {
190: long start = _selectSet.getNow();
191: try {
192: _readBlocked = true;
193: while (isOpen() && _readBlocked) {
194: try {
195: updateKey();
196: this .wait(timeoutMs);
197:
198: if (_readBlocked
199: && timeoutMs < (_selectSet.getNow() - start))
200: return false;
201: } catch (InterruptedException e) {
202: e.printStackTrace();
203: }
204: }
205: } finally {
206: _readBlocked = false;
207: }
208: }
209: return true;
210: }
211:
212: /* ------------------------------------------------------------ */
213: /*
214: * Allows thread to block waiting for further events.
215: */
216: public boolean blockWritable(long timeoutMs) throws IOException {
217: synchronized (this ) {
218: long start = _selectSet.getNow();
219: try {
220: _writeBlocked = true;
221: while (isOpen() && _writeBlocked) {
222: try {
223: updateKey();
224: this .wait(timeoutMs);
225:
226: if (_writeBlocked
227: && timeoutMs < (_selectSet.getNow() - start))
228: return false;
229: } catch (InterruptedException e) {
230: e.printStackTrace();
231: }
232: }
233: } finally {
234: _writeBlocked = false;
235: }
236: }
237: return true;
238: }
239:
240: /* ------------------------------------------------------------ */
241: /**
242: * Updates selection key. Adds operations types to the selection key as needed. No operations
243: * are removed as this is only done during dispatch. This method records the new key and
244: * schedules a call to syncKey to do the keyChange
245: */
246: private void updateKey() {
247: synchronized (this ) {
248: int ops = _key == null ? 0 : _key.interestOps();
249: _interestOps = ops
250: | ((!_dispatched || _readBlocked) ? SelectionKey.OP_READ
251: : 0)
252: | ((!_writable || _writeBlocked) ? SelectionKey.OP_WRITE
253: : 0);
254: _writable = true; // Once writable is in ops, only removed with dispatch.
255:
256: if (_interestOps != ops) {
257: _selectSet.addChange(this );
258: _selectSet.wakeup();
259: }
260: }
261: }
262:
263: /* ------------------------------------------------------------ */
264: /**
265: * Synchronize the interestOps with the actual key. Call is scheduled by a call to updateKey
266: */
267: public void syncKey() {
268: synchronized (this ) {
269: if (_key != null && _key.isValid()) {
270: if (_interestOps >= 0)
271: _key.interestOps(_interestOps);
272: else {
273: _key.cancel();
274: _manager.endPointClosed(this );
275: _key = null;
276: }
277: } else
278: _key = null;
279: }
280: }
281:
282: /* ------------------------------------------------------------ */
283: /*
284: */
285: public void run() {
286: try {
287: _connection.handle();
288: } catch (ClosedChannelException e) {
289: Log.ignore(e);
290: } catch (EofException e) {
291: Log.debug("EOF", e);
292: try {
293: close();
294: } catch (IOException e2) {
295: Log.ignore(e2);
296: }
297: } catch (HttpException e) {
298: Log.debug("BAD", e);
299: try {
300: close();
301: } catch (IOException e2) {
302: Log.ignore(e2);
303: }
304: } catch (Throwable e) {
305: Log.warn("handle failed", e);
306: try {
307: close();
308: } catch (IOException e2) {
309: Log.ignore(e2);
310: }
311: } finally {
312: undispatch();
313: }
314: }
315:
316: /* ------------------------------------------------------------ */
317: /*
318: * @see org.mortbay.io.nio.ChannelEndPoint#close()
319: */
320: public void close() throws IOException {
321: synchronized (this ) {
322: if (_key != null) {
323: _key.cancel();
324: }
325: _key = null;
326: }
327:
328: try {
329: super .close();
330: } catch (IOException e) {
331: Log.ignore(e);
332: }
333:
334: }
335:
336: /* ------------------------------------------------------------ */
337: public String toString() {
338: return "HEP@" + hashCode() + "[d=" + _dispatched + ",io="
339: + _interestOps + ",w=" + _writable + ",b="
340: + _readBlocked + "|" + _writeBlocked + "]";
341: }
342:
343: /* ------------------------------------------------------------ */
344: public Timeout.Task getTimeoutTask() {
345: return _timeoutTask;
346: }
347:
348: /* ------------------------------------------------------------ */
349: public SelectSet getSelectSet() {
350: return _selectSet;
351: }
352:
353: /* ------------------------------------------------------------ */
354: /* ------------------------------------------------------------ */
355: /* ------------------------------------------------------------ */
356: public class IdleTask extends Timeout.Task {
357: /* ------------------------------------------------------------ */
358: /*
359: * @see org.mortbay.thread.Timeout.Task#expire()
360: */
361: public void expire() {
362: idleExpired();
363: }
364:
365: public String toString() {
366: return "TimeoutTask:"
367: + SelectChannelEndPoint.this.toString();
368: }
369:
370: }
371:
372: }
|