001: //========================================================================
002: //$Id: ChannelEndPoint.java,v 1.1 2005/10/05 14:09:38 janb Exp $
003: //Copyright 2004-2005 Mort Bay Consulting Pty. Ltd.
004: //------------------------------------------------------------------------
005: //Licensed under the Apache License, Version 2.0 (the "License");
006: //you may not use this file except in compliance with the License.
007: //You may obtain a copy of the License at
008: //http://www.apache.org/licenses/LICENSE-2.0
009: //Unless required by applicable law or agreed to in writing, software
010: //distributed under the License is distributed on an "AS IS" BASIS,
011: //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
012: //See the License for the specific language governing permissions and
013: //limitations under the License.
014: //========================================================================
015:
016: package org.mortbay.io.nio;
017:
018: import java.io.IOException;
019: import java.net.InetSocketAddress;
020: import java.net.Socket;
021: import java.nio.ByteBuffer;
022: import java.nio.channels.ByteChannel;
023: import java.nio.channels.GatheringByteChannel;
024: import java.nio.channels.SelectableChannel;
025: import java.nio.channels.SocketChannel;
026:
027: import org.mortbay.io.Buffer;
028: import org.mortbay.io.EndPoint;
029: import org.mortbay.io.Portable;
030:
031: /**
032: * @author gregw
033: *
034: * To change the template for this generated type comment go to
035: * Window - Preferences - Java - Code Generation - Code and Comments
036: */
037: public class ChannelEndPoint implements EndPoint {
038: protected ByteChannel _channel;
039: protected ByteBuffer[] _gather2;
040: protected ByteBuffer[] _gather3;
041: protected Socket _socket;
042: protected InetSocketAddress _local;
043: protected InetSocketAddress _remote;
044:
045: /**
046: *
047: */
048: public ChannelEndPoint(ByteChannel channel) {
049: super ();
050: this ._channel = channel;
051: if (channel instanceof SocketChannel)
052: _socket = ((SocketChannel) channel).socket();
053: }
054:
055: public boolean isBlocking() {
056: if (_channel instanceof SelectableChannel)
057: return ((SelectableChannel) _channel).isBlocking();
058: return true;
059: }
060:
061: public boolean blockReadable(long millisecs) throws IOException {
062: return true;
063: }
064:
065: public boolean blockWritable(long millisecs) throws IOException {
066: return true;
067: }
068:
069: /*
070: * @see org.mortbay.io.EndPoint#isOpen()
071: */
072: public boolean isOpen() {
073: return _channel.isOpen();
074: }
075:
076: /* (non-Javadoc)
077: * @see org.mortbay.io.EndPoint#close()
078: */
079: public void close() throws IOException {
080: if (_channel.isOpen()) {
081: try {
082: if (_channel instanceof SocketChannel) {
083: // TODO - is this really required?
084: Socket socket = ((SocketChannel) _channel).socket();
085: try {
086: socket.shutdownOutput();
087: } finally {
088: socket.close();
089: }
090: }
091: } finally {
092: _channel.close();
093: }
094: }
095: }
096:
097: /* (non-Javadoc)
098: * @see org.mortbay.io.EndPoint#fill(org.mortbay.io.Buffer)
099: */
100: public int fill(Buffer buffer) throws IOException {
101: Buffer buf = buffer.buffer();
102: int len = 0;
103: if (buf instanceof NIOBuffer) {
104: NIOBuffer nbuf = (NIOBuffer) buf;
105: ByteBuffer bbuf = nbuf.getByteBuffer();
106: synchronized (nbuf) {
107: try {
108: bbuf.position(buffer.putIndex());
109: len = _channel.read(bbuf);
110: } finally {
111: buffer.setPutIndex(bbuf.position());
112: bbuf.position(0);
113: }
114: }
115: } else {
116: throw new IOException("Not Implemented");
117: }
118:
119: return len;
120: }
121:
122: /* (non-Javadoc)
123: * @see org.mortbay.io.EndPoint#flush(org.mortbay.io.Buffer)
124: */
125: public int flush(Buffer buffer) throws IOException {
126: Buffer buf = buffer.buffer();
127: int len = 0;
128: if (buf instanceof NIOBuffer) {
129: NIOBuffer nbuf = (NIOBuffer) buf;
130: ByteBuffer bbuf = nbuf.getByteBuffer();
131:
132: // TODO synchronize or duplicate?
133: synchronized (nbuf) {
134: try {
135: bbuf.position(buffer.getIndex());
136: bbuf.limit(buffer.putIndex());
137: len = _channel.write(bbuf);
138: } finally {
139: if (len > 0)
140: buffer.skip(len);
141: bbuf.position(0);
142: bbuf.limit(bbuf.capacity());
143: }
144: }
145: } else if (buffer.array() != null) {
146: ByteBuffer b = ByteBuffer.wrap(buffer.array(), buffer
147: .getIndex(), buffer.length());
148: len = _channel.write(b);
149: if (len > 0)
150: buffer.skip(len);
151: } else {
152: throw new IOException("Not Implemented");
153: }
154: return len;
155: }
156:
157: /* (non-Javadoc)
158: * @see org.mortbay.io.EndPoint#flush(org.mortbay.io.Buffer, org.mortbay.io.Buffer, org.mortbay.io.Buffer)
159: */
160: public int flush(Buffer header, Buffer buffer, Buffer trailer)
161: throws IOException {
162: int length = 0;
163:
164: Buffer buf0 = header == null ? null : header.buffer();
165: Buffer buf1 = buffer == null ? null : buffer.buffer();
166: Buffer buf2 = trailer == null ? null : trailer.buffer();
167: if (_channel instanceof GatheringByteChannel && header != null
168: && header.length() != 0 && header instanceof NIOBuffer
169: && buffer != null && buffer.length() != 0
170: && buffer instanceof NIOBuffer) {
171: NIOBuffer nbuf0 = (NIOBuffer) buf0;
172: NIOBuffer nbuf1 = (NIOBuffer) buf1;
173: NIOBuffer nbuf2 = buf2 == null ? null : (NIOBuffer) buf2;
174:
175: // Get the underlying NIO buffers
176: ByteBuffer bbuf0 = nbuf0.getByteBuffer();
177: ByteBuffer bbuf1 = nbuf1.getByteBuffer();
178: ByteBuffer bbuf2 = nbuf2 == null ? null : nbuf2
179: .getByteBuffer();
180:
181: // We must sync because buffers may be shared (eg nbuf1 is likely to be cached content).
182: synchronized (nbuf0) {
183: synchronized (nbuf1) {
184: try {
185: // Adjust position indexs of buf0 and buf1
186: bbuf0.position(header.getIndex());
187: bbuf0.limit(header.putIndex());
188: bbuf1.position(buffer.getIndex());
189: bbuf1.limit(buffer.putIndex());
190:
191: // if we don't have a buf2
192: if (bbuf2 == null) {
193: synchronized (this ) {
194: // create a gether array for 2 buffers
195: if (_gather2 == null)
196: _gather2 = new ByteBuffer[2];
197: _gather2[0] = bbuf0;
198: _gather2[1] = bbuf1;
199:
200: // do the gathering write.
201: length = (int) ((GatheringByteChannel) _channel)
202: .write(_gather2);
203:
204: int hl = header.length();
205: if (length > hl) {
206: header.clear();
207: buffer.skip(length - hl);
208: } else if (length > 0) {
209: header.skip(length);
210: }
211: }
212: } else {
213: // we have a third buffer, so sync on it as well
214: synchronized (nbuf2) {
215: try {
216: // Adjust position indexs of buf2
217: bbuf2.position(trailer.getIndex());
218: bbuf2.limit(trailer.putIndex());
219:
220: synchronized (this ) {
221: // create a gether array for 3 buffers
222: if (_gather3 == null)
223: _gather3 = new ByteBuffer[3];
224: _gather3[0] = bbuf0;
225: _gather3[1] = bbuf1;
226: _gather3[2] = bbuf2;
227: // do the gathering write.
228: length = (int) ((GatheringByteChannel) _channel)
229: .write(_gather3);
230:
231: int hl = header.length();
232: int bl = buffer.length();
233: if (length > hl + bl) {
234: header.clear();
235: buffer.clear();
236: trailer.skip(length - hl
237: - bl);
238: } else if (length > hl) {
239: header.clear();
240: buffer.skip(length - hl);
241: } else if (length > 0) {
242: header.skip(length);
243: }
244: }
245: } finally {
246: // adjust buffer 2.
247: if (!trailer.isImmutable())
248: trailer.setGetIndex(bbuf2
249: .position());
250: bbuf2.position(0);
251: bbuf2.limit(bbuf2.capacity());
252: }
253: }
254: }
255: } finally {
256: // adjust buffer 0 and 1
257: if (!header.isImmutable())
258: header.setGetIndex(bbuf0.position());
259: if (!buffer.isImmutable())
260: buffer.setGetIndex(bbuf1.position());
261:
262: bbuf0.position(0);
263: bbuf1.position(0);
264: bbuf0.limit(bbuf0.capacity());
265: bbuf1.limit(bbuf1.capacity());
266: }
267: }
268: }
269: } else {
270: // TODO - consider copying buffers buffer and trailer into header if there is space!
271:
272: // flush header
273: if (header != null && header.length() > 0)
274: length = flush(header);
275:
276: // flush buffer
277: if ((header == null || header.length() == 0)
278: && buffer != null && buffer.length() > 0)
279: length += flush(buffer);
280:
281: // flush trailer
282: if ((header == null || header.length() == 0)
283: && (buffer == null || buffer.length() == 0)
284: && trailer != null && trailer.length() > 0)
285: length += flush(trailer);
286: }
287:
288: return length;
289: }
290:
291: /**
292: * @return Returns the channel.
293: */
294: public ByteChannel getChannel() {
295: return _channel;
296: }
297:
298: /* ------------------------------------------------------------ */
299: /*
300: * @see org.mortbay.io.EndPoint#getLocalAddr()
301: */
302: public String getLocalAddr() {
303: if (_socket == null)
304: return null;
305:
306: if (_local == null)
307: _local = (InetSocketAddress) _socket
308: .getLocalSocketAddress();
309:
310: if (_local == null || _local.getAddress() == null
311: || _local.getAddress().isAnyLocalAddress())
312: return Portable.ALL_INTERFACES;
313:
314: return _local.getAddress().getHostAddress();
315: }
316:
317: /* ------------------------------------------------------------ */
318: /*
319: * @see org.mortbay.io.EndPoint#getLocalHost()
320: */
321: public String getLocalHost() {
322: if (_socket == null)
323: return null;
324:
325: if (_local == null)
326: _local = (InetSocketAddress) _socket
327: .getLocalSocketAddress();
328:
329: if (_local == null || _local.getAddress() == null
330: || _local.getAddress().isAnyLocalAddress())
331: return Portable.ALL_INTERFACES;
332:
333: return _local.getAddress().getCanonicalHostName();
334: }
335:
336: /* ------------------------------------------------------------ */
337: /*
338: * @see org.mortbay.io.EndPoint#getLocalPort()
339: */
340: public int getLocalPort() {
341: if (_socket == null)
342: return 0;
343:
344: if (_local == null)
345: _local = (InetSocketAddress) _socket
346: .getLocalSocketAddress();
347: if (_local == null)
348: return -1;
349: return _local.getPort();
350: }
351:
352: /* ------------------------------------------------------------ */
353: /*
354: * @see org.mortbay.io.EndPoint#getRemoteAddr()
355: */
356: public String getRemoteAddr() {
357: if (_socket == null)
358: return null;
359:
360: if (_remote == null)
361: _remote = (InetSocketAddress) _socket
362: .getRemoteSocketAddress();
363:
364: if (_remote == null)
365: return null;
366: return _remote.getAddress().getHostAddress();
367: }
368:
369: /* ------------------------------------------------------------ */
370: /*
371: * @see org.mortbay.io.EndPoint#getRemoteHost()
372: */
373: public String getRemoteHost() {
374: if (_socket == null)
375: return null;
376:
377: if (_remote == null)
378: _remote = (InetSocketAddress) _socket
379: .getRemoteSocketAddress();
380:
381: return _remote.getAddress().getCanonicalHostName();
382: }
383:
384: /* ------------------------------------------------------------ */
385: /*
386: * @see org.mortbay.io.EndPoint#getRemotePort()
387: */
388: public int getRemotePort() {
389: if (_socket == null)
390: return 0;
391:
392: if (_remote == null)
393: _remote = (InetSocketAddress) _socket
394: .getRemoteSocketAddress();
395:
396: return _remote == null ? -1 : _remote.getPort();
397: }
398:
399: /* ------------------------------------------------------------ */
400: /*
401: * @see org.mortbay.io.EndPoint#getConnection()
402: */
403: public Object getTransport() {
404: return _channel;
405: }
406:
407: /* ------------------------------------------------------------ */
408: public void flush() throws IOException {
409: }
410:
411: /* ------------------------------------------------------------ */
412: public boolean isBufferingInput() {
413: return false;
414: }
415:
416: /* ------------------------------------------------------------ */
417: public boolean isBufferingOutput() {
418: return false;
419: }
420:
421: /* ------------------------------------------------------------ */
422: public boolean isBufferred() {
423: return false;
424: }
425: }
|