001: //========================================================================
002: // Parts Copyright 2006 Mort Bay Consulting Pty. Ltd.
003: //------------------------------------------------------------------------
004: // Licensed under the Apache License, Version 2.0 (the "License");
005: // you may not use this file except in compliance with the License.
006: // You may obtain a copy of the License at
007: // http://www.apache.org/licenses/LICENSE-2.0
008: // Unless required by applicable law or agreed to in writing, software
009: // distributed under the License is distributed on an "AS IS" BASIS,
010: // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
011: // See the License for the specific language governing permissions and
012: // limitations under the License.
013: //========================================================================
014:
015: package org.mortbay.jetty.grizzly;
016:
017: import com.sun.enterprise.web.connector.grizzly.OutputWriter;
018: import java.io.IOException;
019: import java.nio.ByteBuffer;
020: import java.nio.channels.ByteChannel;
021: import java.nio.channels.GatheringByteChannel;
022: import java.nio.channels.SocketChannel;
023: import org.mortbay.io.Buffer;
024:
025: import org.mortbay.io.nio.ChannelEndPoint;
026: import org.mortbay.io.nio.NIOBuffer;
027: import org.mortbay.jetty.HttpConnection;
028: import org.mortbay.jetty.HttpParser;
029: import org.mortbay.log.Log;
030: import org.mortbay.util.ajax.Continuation;
031:
032: public class GrizzlyEndPoint extends ChannelEndPoint {
033: protected HttpConnection _connection;
034:
035: private GrizzlySocketChannel _blockingChannel;
036:
037: public GrizzlyEndPoint(GrizzlyConnector connector,
038: ByteChannel channel) throws IOException {
039: // TODO: Needs an empty constructor?
040: super (channel);
041:
042: //System.err.println("\nnew GrizzlyEndPoint channel="+channel);
043: _connection = new HttpConnection(connector, this , connector
044: .getServer());
045: }
046:
047: public void handle() {
048: //System.err.println("GrizzlyEndPoint.handle "+this);
049:
050: try {
051: //System.err.println("handle "+this);
052: _connection.handle();
053: } catch (Throwable e) {
054: Log.warn("handle failed", e);
055: throw new RuntimeException(e);
056: } finally {
057: //System.err.println("handled "+this);
058: Continuation continuation = _connection.getRequest()
059: .getContinuation();
060: if (continuation != null && continuation.isPending()) {
061: // We have a continuation
062: // TODO something!
063: } else {
064: // something else... normally re-enable this connection is the selectset with the latest interested ops
065: }
066: }
067:
068: }
069:
070: /* (non-Javadoc)
071: * @see org.mortbay.io.EndPoint#fill(org.mortbay.io.Buffer)
072: */
073: public int fill(Buffer buffer) throws IOException {
074: return 0; // Always filled way before by Grizzly.
075: }
076:
077: /* (non-Javadoc)
078: * @see org.mortbay.io.EndPoint#flush(org.mortbay.io.Buffer)
079: */
080: public int flush(Buffer buffer) throws IOException {
081: Buffer buf = buffer.buffer();
082: int len = 0;
083: if (buf instanceof NIOBuffer) {
084: NIOBuffer nbuf = (NIOBuffer) buf;
085: ByteBuffer bbuf = nbuf.getByteBuffer();
086:
087: // TODO synchronize or duplicate?
088: synchronized (nbuf) {
089: try {
090: bbuf.position(buffer.getIndex());
091: bbuf.limit(buffer.putIndex());
092: len = bbuf.remaining();
093: OutputWriter.flushChannel((SocketChannel) _channel,
094: bbuf);
095: } finally {
096: if (!buffer.isImmutable())
097: buffer.setGetIndex(bbuf.position());
098: bbuf.position(0);
099: bbuf.limit(bbuf.capacity());
100: }
101: }
102: } else if (buffer.array() != null) {
103: ByteBuffer b = ByteBuffer.wrap(buffer.array(), buffer
104: .getIndex(), buffer.length());
105: len = b.remaining();
106: OutputWriter.flushChannel((SocketChannel) _channel, b);
107: if (!buffer.isImmutable())
108: buffer.setGetIndex(b.position());
109: } else {
110: throw new IOException("Not Implemented");
111: }
112:
113: return len;
114: }
115:
116: /* (non-Javadoc)
117: * @see org.mortbay.io.EndPoint#flush(org.mortbay.io.Buffer, org.mortbay.io.Buffer, org.mortbay.io.Buffer)
118: */
119: public int flush(Buffer header, Buffer buffer, Buffer trailer)
120: throws IOException {
121: int length = 0;
122:
123: Buffer buf0 = header == null ? null : header.buffer();
124: Buffer buf1 = buffer == null ? null : buffer.buffer();
125: Buffer buf2 = trailer == null ? null : trailer.buffer();
126: if (_channel instanceof GatheringByteChannel && header != null
127: && header.length() != 0 && header instanceof NIOBuffer
128: && buffer != null && buffer.length() != 0
129: && buffer instanceof NIOBuffer) {
130: NIOBuffer nbuf0 = (NIOBuffer) buf0;
131: NIOBuffer nbuf1 = (NIOBuffer) buf1;
132: NIOBuffer nbuf2 = buf2 == null ? null : (NIOBuffer) buf2;
133:
134: // Get the underlying NIO buffers
135: ByteBuffer bbuf0 = nbuf0.getByteBuffer();
136: ByteBuffer bbuf1 = nbuf1.getByteBuffer();
137: ByteBuffer bbuf2 = nbuf2 == null ? null : nbuf2
138: .getByteBuffer();
139:
140: // We must sync because buffers may be shared (eg nbuf1 is likely to be cached content).
141: synchronized (nbuf0) {
142: synchronized (nbuf1) {
143: try {
144: // Adjust position indexs of buf0 and buf1
145: bbuf0.position(header.getIndex());
146: bbuf0.limit(header.putIndex());
147: bbuf1.position(buffer.getIndex());
148: bbuf1.limit(buffer.putIndex());
149:
150: // if we don't have a buf2
151: if (bbuf2 == null) {
152: synchronized (this ) {
153: // create a gether array for 2 buffers
154: if (_gather2 == null)
155: _gather2 = new ByteBuffer[2];
156: _gather2[0] = bbuf0;
157: _gather2[1] = bbuf1;
158:
159: // do the gathering write.
160: length = (int) OutputWriter
161: .flushChannel(
162: (SocketChannel) _channel,
163: _gather2);
164: }
165: } else {
166: // we have a third buffer, so sync on it as well
167: synchronized (nbuf2) {
168: try {
169: // Adjust position indexs of buf2
170: bbuf2.position(trailer.getIndex());
171: bbuf2.limit(trailer.putIndex());
172:
173: synchronized (this ) {
174: // create a gether array for 3 buffers
175: if (_gather3 == null)
176: _gather3 = new ByteBuffer[3];
177: _gather3[0] = bbuf0;
178: _gather3[1] = bbuf1;
179: _gather3[2] = bbuf2;
180: // do the gathering write.
181: length = (int) OutputWriter
182: .flushChannel(
183: (SocketChannel) _channel,
184: _gather3);
185: }
186: } finally {
187: // adjust buffer 2.
188: if (!trailer.isImmutable())
189: trailer.setGetIndex(bbuf2
190: .position());
191: bbuf2.position(0);
192: bbuf2.limit(bbuf2.capacity());
193: }
194: }
195: }
196: } finally {
197: // adjust buffer 0 and 1
198: if (!header.isImmutable())
199: header.setGetIndex(bbuf0.position());
200: if (!buffer.isImmutable())
201: buffer.setGetIndex(bbuf1.position());
202:
203: bbuf0.position(0);
204: bbuf1.position(0);
205: bbuf0.limit(bbuf0.capacity());
206: bbuf1.limit(bbuf1.capacity());
207: }
208: }
209: }
210: } else {
211: // TODO - consider copying buffers buffer and trailer into header if there is space!
212:
213: // flush header
214: if (header != null && header.length() > 0)
215: length = flush(header);
216:
217: // flush buffer
218: if ((header == null || header.length() == 0)
219: && buffer != null && buffer.length() > 0)
220: length += flush(buffer);
221:
222: // flush trailer
223: if ((header == null || header.length() == 0)
224: && (buffer == null || buffer.length() == 0)
225: && trailer != null && trailer.length() > 0)
226: length += flush(trailer);
227: }
228:
229: return length;
230: }
231:
232: public boolean blockReadable(long millisecs) {
233: Buffer buffer = ((HttpParser) _connection.getParser())
234: .getHeaderBuffer();
235: if (buffer instanceof NIOBuffer) {
236: ByteBuffer byteBuffer = ((NIOBuffer) buffer)
237: .getByteBuffer();
238: _blockingChannel.setReadTimeout(millisecs);
239: try {
240: _blockingChannel.read(byteBuffer);
241: } catch (IOException ex) {
242: ; // TODO: Rethrow in case the client closed the connection.
243: return false;
244: }
245: } else {
246: ; // TODO: How to handle this case.
247: }
248: return true;
249: }
250:
251: public boolean blockWritable(long millisecs) {
252: Buffer buffer = ((HttpParser) _connection.getParser())
253: .getHeaderBuffer();
254: if (buffer instanceof NIOBuffer) {
255: ByteBuffer byteBuffer = ((NIOBuffer) buffer)
256: .getByteBuffer();
257: _blockingChannel.setWriteTimeout(millisecs);
258: try {
259: _blockingChannel.write(byteBuffer);
260: } catch (IOException ex) {
261: // TODO: Rethrow in case the client closed the connection.
262: return false;
263: }
264: } else {
265: ; //TODO: How to handle this case.
266: }
267: return true;
268: }
269:
270: public boolean keepAlive() {
271: return _connection.getGenerator().isPersistent();
272: }
273:
274: public boolean isComplete() {
275: return _connection.getGenerator().isComplete();
276: }
277:
278: public boolean isBlocking() {
279: return false;
280: }
281:
282: public void setChannel(ByteChannel channel) {
283: _channel = channel;
284: _blockingChannel = new GrizzlySocketChannel();
285: }
286:
287: public void recycle() {
288: _connection.destroy();
289: }
290:
291: public HttpConnection getHttpConnection() {
292: return _connection;
293: }
294:
295: /* (non-Javadoc)
296: * @see org.mortbay.io.EndPoint#close()
297: */
298: public void close() throws IOException {
299: ; // Do nothing as this will be handled by Grizzly.
300: }
301: }
|