001: // ========================================================================
002: // Copyright 2003-2005 Mort Bay Consulting Pty. Ltd.
003: // ------------------------------------------------------------------------
004: // Licensed under the Apache License, Version 2.0 (the "License");
005: // you may not use this file except in compliance with the License.
006: // You may obtain a copy of the License at
007: // http://www.apache.org/licenses/LICENSE-2.0
008: // Unless required by applicable law or agreed to in writing, software
009: // distributed under the License is distributed on an "AS IS" BASIS,
010: // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011: // See the License for the specific language governing permissions and
012: // limitations under the License.
013: // ========================================================================
014:
015: package org.mortbay.jetty.nio;
016:
017: import java.io.IOException;
018: import java.net.InetSocketAddress;
019: import java.net.Socket;
020: import java.nio.channels.SelectionKey;
021: import java.nio.channels.ServerSocketChannel;
022: import java.nio.channels.SocketChannel;
023:
024: import org.mortbay.io.Buffer;
025: import org.mortbay.io.Connection;
026: import org.mortbay.io.nio.SelectChannelEndPoint;
027: import org.mortbay.io.nio.SelectorManager;
028: import org.mortbay.io.nio.SelectorManager.SelectSet;
029: import org.mortbay.jetty.HttpConnection;
030: import org.mortbay.jetty.Request;
031: import org.mortbay.jetty.RetryRequest;
032: import org.mortbay.log.Log;
033: import org.mortbay.thread.Timeout;
034: import org.mortbay.util.ajax.Continuation;
035:
036: /* ------------------------------------------------------------------------------- */
037: /**
038: * Selecting NIO connector.
039: * <p>
040: * This connector uses efficient NIO buffers with a non blocking threading model. Direct NIO buffers
041: * are used and threads are only allocated to connections with requests. Synchronization is used to
042: * simulate blocking for the servlet API, and any unflushed content at the end of request handling
043: * is written asynchronously.
044: * </p>
045: * <p>
046: * This connector is best used when there are a many connections that have idle periods.
047: * </p>
048: * <p>
049: * When used with {@link org.mortbay.util.ajax.Continuation}, threadless waits are supported. When
050: * a filter or servlet calls getEvent on a Continuation, a {@link org.mortbay.jetty.RetryRequest}
051: * runtime exception is thrown to allow the thread to exit the current request handling. Jetty will
052: * catch this exception and will not send a response to the client. Instead the thread is released
053: * and the Continuation is placed on the timer queue. If the Continuation timeout expires, or it's
054: * resume method is called, then the request is again allocated a thread and the request is retried.
055: * The limitation of this approach is that request content is not available on the retried request,
056: * thus if possible it should be read after the continuation or saved as a request attribute or as the
057: * associated object of the Continuation instance.
058: * </p>
059: *
060: * @org.apache.xbean.XBean element="nioConnector" description="Creates an NIO based socket connector"
061: *
062: * @author gregw
063: *
064: */
065: public class SelectChannelConnector extends AbstractNIOConnector {
066: private transient ServerSocketChannel _acceptChannel;
067:
068: private SelectorManager _manager = new SelectorManager() {
069: protected SocketChannel acceptChannel(SelectionKey key)
070: throws IOException {
071: SocketChannel channel = ((ServerSocketChannel) key
072: .channel()).accept();
073: if (channel == null)
074: return null;
075: channel.configureBlocking(false);
076: Socket socket = channel.socket();
077: configure(socket);
078: return channel;
079: }
080:
081: protected boolean dispatch(Runnable task) throws IOException {
082: return getThreadPool().dispatch(task);
083: }
084:
085: protected void endPointClosed(SelectChannelEndPoint endpoint) {
086: connectionClosed((HttpConnection) endpoint.getConnection());
087: }
088:
089: protected void endPointOpened(SelectChannelEndPoint endpoint) {
090: connectionOpened((HttpConnection) endpoint.getConnection());
091: }
092:
093: protected Connection newConnection(SocketChannel channel,
094: SelectChannelEndPoint endpoint) {
095: return new HttpConnection(SelectChannelConnector.this ,
096: endpoint, getServer());
097: }
098:
099: protected SelectChannelEndPoint newEndPoint(
100: SocketChannel channel, SelectSet selectSet,
101: SelectionKey sKey) throws IOException {
102: return SelectChannelConnector.this .newEndPoint(channel,
103: selectSet, sKey);
104: }
105: };
106:
107: /* ------------------------------------------------------------------------------- */
108: /**
109: * Constructor.
110: *
111: */
112: public SelectChannelConnector() {
113: }
114:
115: /* ------------------------------------------------------------ */
116: public void accept(int acceptorID) throws IOException {
117: _manager.doSelect(acceptorID);
118: }
119:
120: /* ------------------------------------------------------------ */
121: public void stopAccept(int acceptorID) throws Exception {
122: _manager.doStop(acceptorID);
123: }
124:
125: /* ------------------------------------------------------------ */
126: public void close() throws IOException {
127: if (_acceptChannel != null)
128: _acceptChannel.close();
129: _acceptChannel = null;
130:
131: }
132:
133: /* ------------------------------------------------------------------------------- */
134: public void customize(org.mortbay.io.EndPoint endpoint,
135: Request request) throws IOException {
136: ((ConnectorEndPoint) endpoint).cancelIdle();
137: super .customize(endpoint, request);
138: }
139:
140: /* ------------------------------------------------------------------------------- */
141: public void persist(org.mortbay.io.EndPoint endpoint)
142: throws IOException {
143: ((ConnectorEndPoint) endpoint).scheduleIdle();
144: super .persist(endpoint);
145: }
146:
147: /* ------------------------------------------------------------ */
148: public Object getConnection() {
149: return _acceptChannel;
150: }
151:
152: /* ------------------------------------------------------------ */
153: /** Get delay select key update
154: * If true, the select set is not updated when a endpoint is dispatched for
155: * reading. The assumption is that the task will be short and thus will probably
156: * be complete before the select is tried again.
157: * @return Returns the assumeShortDispatch.
158: */
159: public boolean getDelaySelectKeyUpdate() {
160: return _manager.isDelaySelectKeyUpdate();
161: }
162:
163: /* ------------------------------------------------------------------------------- */
164: public int getLocalPort() {
165: if (_acceptChannel == null || !_acceptChannel.isOpen())
166: return -1;
167: return _acceptChannel.socket().getLocalPort();
168: }
169:
170: /* ------------------------------------------------------------ */
171: /*
172: * @see org.mortbay.jetty.Connector#newContinuation()
173: */
174: public Continuation newContinuation() {
175: return new RetryContinuation();
176: }
177:
178: /* ------------------------------------------------------------ */
179: public void open() throws IOException {
180: if (_acceptChannel == null) {
181: // Create a new server socket and set to non blocking mode
182: _acceptChannel = ServerSocketChannel.open();
183: _acceptChannel.configureBlocking(false);
184:
185: // Bind the server socket to the local host and port
186: InetSocketAddress addr = getHost() == null ? new InetSocketAddress(
187: getPort())
188: : new InetSocketAddress(getHost(), getPort());
189: _acceptChannel.socket().bind(addr, getAcceptQueueSize());
190:
191: // Register accepts on the server socket with the selector.
192: _manager.register(_acceptChannel, SelectionKey.OP_ACCEPT);
193: }
194: }
195:
196: /* ------------------------------------------------------------ */
197: /**
198: * @param delay If true, updating a {@link SelectionKey} is delayed until a redundant event is
199: * schedules. This is an optimization that assumes event handling can be completed before the next select
200: * completes.
201: */
202: public void setDelaySelectKeyUpdate(boolean delay) {
203: _manager.setDelaySelectKeyUpdate(delay);
204: }
205:
206: /* ------------------------------------------------------------ */
207: public void setMaxIdleTime(int maxIdleTime) {
208: _manager.setMaxIdleTime(maxIdleTime);
209: super .setMaxIdleTime(maxIdleTime);
210: }
211:
212: /* ------------------------------------------------------------ */
213: /*
214: * @see org.mortbay.jetty.AbstractConnector#doStart()
215: */
216: protected void doStart() throws Exception {
217: _manager.setSelectSets(getAcceptors());
218: _manager.setMaxIdleTime(getMaxIdleTime());
219: _manager.start();
220: super .doStart();
221: }
222:
223: /* ------------------------------------------------------------ */
224: /*
225: * @see org.mortbay.jetty.AbstractConnector#doStop()
226: */
227: protected void doStop() throws Exception {
228: _manager.stop();
229: super .doStop();
230: }
231:
232: /* ------------------------------------------------------------ */
233: protected SelectChannelEndPoint newEndPoint(SocketChannel channel,
234: SelectSet selectSet, SelectionKey key) throws IOException {
235: return new ConnectorEndPoint(channel, selectSet, key);
236: }
237:
238: /* ------------------------------------------------------------ */
239: /* ------------------------------------------------------------ */
240: /* ------------------------------------------------------------ */
241: public static class ConnectorEndPoint extends SelectChannelEndPoint {
242: public ConnectorEndPoint(SocketChannel channel,
243: SelectSet selectSet, SelectionKey key) {
244: super (channel, selectSet, key);
245: scheduleIdle();
246: }
247:
248: public void close() throws IOException {
249: RetryContinuation continuation = (RetryContinuation) ((HttpConnection) getConnection())
250: .getRequest().getContinuation();
251: if (continuation != null && continuation.isPending())
252: continuation.reset();
253:
254: super .close();
255: }
256:
257: public void undispatch() {
258: RetryContinuation continuation = (RetryContinuation) ((HttpConnection) getConnection())
259: .getRequest().getContinuation();
260:
261: if (continuation != null) {
262: // We have a continuation
263: Log.debug("continuation {}", continuation);
264: if (!continuation.schedule())
265: super .undispatch();
266: } else {
267: super .undispatch();
268: }
269: }
270: }
271:
272: /* ------------------------------------------------------------ */
273: /* ------------------------------------------------------------ */
274: /* ------------------------------------------------------------ */
275: public static class RetryContinuation extends Timeout.Task
276: implements Continuation, Runnable {
277: SelectChannelEndPoint _endPoint = (SelectChannelEndPoint) HttpConnection
278: .getCurrentConnection().getEndPoint();
279: boolean _new = true;
280: Object _object;
281: boolean _pending = false; // waiting for resume or timeout
282: boolean _resumed = false; // resume called.
283: boolean _scheduled = false; // Either dispatched or timeout set.
284: RetryRequest _retry;
285: long _timeout;
286:
287: public void expire() {
288: boolean redispatch = false;
289: synchronized (this ) {
290: redispatch = _scheduled && _pending && !_resumed;
291: }
292: if (redispatch) {
293: _endPoint.scheduleIdle(); // TODO maybe not needed?
294: _endPoint.getSelectSet().addChange(this );
295: _endPoint.getSelectSet().wakeup();
296: }
297: }
298:
299: public Object getObject() {
300: return _object;
301: }
302:
303: public long getTimeout() {
304: return _timeout;
305: }
306:
307: public boolean isNew() {
308: return _new;
309: }
310:
311: public boolean isPending() {
312: return _pending;
313: }
314:
315: public boolean isResumed() {
316: return _pending;
317: }
318:
319: public void reset() {
320: synchronized (this ) {
321: _resumed = false;
322: _pending = false;
323: _scheduled = false;
324: }
325:
326: synchronized (_endPoint.getSelectSet()) {
327: this .cancel();
328: }
329: }
330:
331: public void resume() {
332: boolean redispatch = false;
333: synchronized (this ) {
334: if (_pending && !isExpired()) {
335: _resumed = true;
336: redispatch = _scheduled;
337: }
338: }
339:
340: if (redispatch) {
341: SelectSet selectSet = _endPoint.getSelectSet();
342:
343: synchronized (selectSet) {
344: this .cancel();
345: }
346:
347: _endPoint.scheduleIdle(); // TODO maybe not needed?
348: selectSet.addChange(this );
349: selectSet.wakeup();
350: }
351: }
352:
353: public void run() {
354: _endPoint.run();
355: }
356:
357: /* schedule continuation.
358: * Called when a run exits.
359: * Either sets timeout or dispatches if already resumed or expired */
360: public boolean schedule() {
361: boolean redispatch = false;
362:
363: synchronized (this ) {
364: if (!_pending)
365: return false;
366: _scheduled = true;
367: redispatch = isExpired() || _resumed;
368: }
369:
370: if (redispatch) {
371: _endPoint.scheduleIdle();
372: _endPoint.getSelectSet().addChange(this );
373: } else
374: _endPoint.getSelectSet()
375: .scheduleTimeout(this , _timeout);
376:
377: _endPoint.getSelectSet().wakeup();
378: return true;
379: }
380:
381: public void setObject(Object object) {
382: _object = object;
383: }
384:
385: public boolean suspend(long timeout) {
386: boolean resumed = false;
387: synchronized (this ) {
388: resumed = _resumed;
389: _resumed = false;
390: _new = false;
391: if (!_pending && !resumed && timeout >= 0) {
392: _pending = true;
393: _scheduled = false;
394: _timeout = timeout;
395: if (_retry == null)
396: _retry = new RetryRequest();
397: throw _retry;
398: }
399:
400: // here only if suspend called on pending continuation.
401: // acts like a reset
402: _resumed = false;
403: _pending = false;
404: }
405:
406: synchronized (_endPoint.getSelectSet()) {
407: this.cancel();
408: }
409:
410: return resumed;
411: }
412: }
413:
414: }
|