0001: /*
0002: * Copyright (c) xsocket.org, 2006 - 2008. All rights reserved.
0003: *
0004: * This library is free software; you can redistribute it and/or
0005: * modify it under the terms of the GNU Lesser General Public
0006: * License as published by the Free Software Foundation; either
0007: * version 2.1 of the License, or (at your option) any later version.
0008: *
0009: * This library is distributed in the hope that it will be useful,
0010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
0012: * Lesser General Public License for more details.
0013: *
0014: * You should have received a copy of the GNU Lesser General Public
0015: * License along with this library; if not, write to the Free Software
0016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0017: *
0018: * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
0019: * The latest copy of this software may be found on http://www.xsocket.org/
0020: */
0021: package org.xsocket.connection;
0022:
0023: import java.io.IOException;
0024: import java.io.UnsupportedEncodingException;
0025: import java.net.InetAddress;
0026: import java.net.InetSocketAddress;
0027: import java.net.SocketTimeoutException;
0028: import java.nio.BufferOverflowException;
0029: import java.nio.BufferUnderflowException;
0030: import java.nio.ByteBuffer;
0031: import java.nio.channels.ClosedChannelException;
0032: import java.nio.channels.FileChannel;
0033: import java.nio.channels.ReadableByteChannel;
0034: import java.nio.channels.WritableByteChannel;
0035: import java.util.ArrayList;
0036: import java.util.HashMap;
0037: import java.util.List;
0038: import java.util.Map;
0039: import java.util.concurrent.Executor;
0040: import java.util.logging.Level;
0041: import java.util.logging.Logger;
0042:
0043: import javax.net.ssl.SSLContext;
0044:
0045: import org.xsocket.DataConverter;
0046: import org.xsocket.Execution;
0047: import org.xsocket.MaxReadSizeExceededException;
0048:
0049: /**
0050: * Implementation of the <code>IBlockingConnection</code> interface. Internally a {@link INonBlockingConnection}
0051: * will be used. A <code>BlockingConnection</code> wraps a <code>INonBlockingConnection</code>. There are two ways to
0052: * create a <code>BlockingConnection</code>:
0053: * <ul>
0054: * <li>by passing over the remote address (e.g. host name & port), or</li>
0055: * <li>by passing over a <code>INonBlockingConnection</code>, which will be wrapped</li>
0056: * </ul>
0057: * <br><br>
0058: *
0059: * A newly created connection is in the open state. Write or read methods can be called immediately <br><br>
0060: *
0061: * The methods of this class are not thread-safe.
0062: *
0063: * @author grro@xsocket.org
0064: */
0065: public class BlockingConnection implements IBlockingConnection {
0066:
0067: private static final Logger LOG = Logger
0068: .getLogger(BlockingConnection.class.getName());
0069:
0070: private final ReadNotificationHandler handler = new ReadNotificationHandler();
0071: private final Object readGuard = new Object();
0072:
0073: private INonBlockingConnection delegee = null;
0074: private int receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;
0075:
0076: /**
0077: * constructor. <br><br>
0078: *
0079: * @param hostname the remote host
0080: * @param port the port of the remote host to connect
0081: * @throws IOException If some other I/O error occurs
0082: */
0083: public BlockingConnection(String hostname, int port)
0084: throws IOException {
0085: this (new InetSocketAddress(hostname, port), Integer.MAX_VALUE,
0086: new HashMap<String, Object>(), null, false);
0087: }
0088:
0089: /**
0090: * constructor. <br><br>
0091: *
0092: * @param hostname the remote host
0093: * @param port the port of the remote host to connect
0094: * @param options the socket options
0095: * @throws IOException If some other I/O error occurs
0096: */
0097: public BlockingConnection(String hostname, int port,
0098: Map<String, Object> options) throws IOException {
0099: this (new InetSocketAddress(hostname, port), Integer.MAX_VALUE,
0100: options, null, false);
0101: }
0102:
0103: /**
0104: * constructor
0105: *
0106: * @param address the remote host address
0107: * @param port the remote host port
0108: * @throws IOException If some other I/O error occurs
0109: */
0110: public BlockingConnection(InetAddress address, int port)
0111: throws IOException {
0112: this (address, port, Integer.MAX_VALUE,
0113: new HashMap<String, Object>(), null, false);
0114: }
0115:
0116: /**
0117: * constructor
0118: *
0119: * @param address the remote host address
0120: * @param port the remote host port
0121: * @param connectTimeoutMillis the timeout of the connect procedure
0122: * @throws IOException If some other I/O error occurs
0123: */
0124: public BlockingConnection(InetAddress address, int port,
0125: int connectTimeoutMillis) throws IOException {
0126: this (new InetSocketAddress(address, port),
0127: connectTimeoutMillis, new HashMap<String, Object>(),
0128: null, false);
0129: }
0130:
0131: /**
0132: * constructor
0133: *
0134: * @param address the remote host name
0135: * @param port the remote host port
0136: * @param sslContext the sslContext to use
0137: * @param sslOn true, activate SSL mode. false, ssl can be activated by user (see {@link IReadWriteableConnection#activateSecuredMode()})
0138: * @throws IOException If some other I/O error occurs
0139: */
0140: public BlockingConnection(InetAddress address, int port,
0141: SSLContext sslContext, boolean sslOn) throws IOException {
0142: this (new InetSocketAddress(address, port), Integer.MAX_VALUE,
0143: new HashMap<String, Object>(), sslContext, sslOn);
0144: }
0145:
0146: /**
0147: * constructor
0148: *
0149: * @param address the remote host name
0150: * @param port the remote host port
0151: * @param connectTimeoutMillis the timeout of the connect procedure
0152: * @param sslContext the sslContext to use
0153: * @param sslOn true, activate SSL mode. false, ssl can be activated by user (see {@link IReadWriteableConnection#activateSecuredMode()})
0154: * @throws IOException If some other I/O error occurs
0155: */
0156: public BlockingConnection(InetAddress address, int port,
0157: int connectTimeoutMillis, SSLContext sslContext,
0158: boolean sslOn) throws IOException {
0159: this (new InetSocketAddress(address, port),
0160: connectTimeoutMillis, new HashMap<String, Object>(),
0161: sslContext, sslOn);
0162: }
0163:
0164: /**
0165: * constructor
0166: *
0167: * @param address the remote host name
0168: * @param port the remote host port
0169: * @param options the socket options
0170: * @param sslContext the sslContext to use
0171: * @param sslOn true, activate SSL mode. false, ssl can be activated by user (see {@link IReadWriteableConnection#activateSecuredMode()})
0172: * @throws IOException If some other I/O error occurs
0173: */
0174: public BlockingConnection(InetAddress address, int port,
0175: Map<String, Object> options, SSLContext sslContext,
0176: boolean sslOn) throws IOException {
0177: this (new InetSocketAddress(address, port), Integer.MAX_VALUE,
0178: options, sslContext, sslOn);
0179: }
0180:
0181: /**
0182: * constructor
0183: *
0184: * @param address the remote host name
0185: * @param port the remote host port
0186: * @param connectTimeoutMillis the timeout of the connect procedure
0187: * @param options the socket options
0188: * @param sslContext the sslContext to use
0189: * @param sslOn true, activate SSL mode. false, ssl can be activated by user (see {@link IReadWriteableConnection#activateSecuredMode()})
0190: * @throws IOException If some other I/O error occurs
0191: */
0192: public BlockingConnection(InetAddress address, int port,
0193: int connectTimeoutMillis, Map<String, Object> options,
0194: SSLContext sslContext, boolean sslOn) throws IOException {
0195: this (new InetSocketAddress(address, port),
0196: connectTimeoutMillis, options, sslContext, sslOn);
0197: }
0198:
0199: /**
0200: * constructor
0201: *
0202: * @param hostname the remote host name
0203: * @param port the remote host port
0204: * @param sslContext the sslContext to use
0205: * @param sslOn true, activate SSL mode. false, ssl can be activated by user (see {@link IReadWriteableConnection#activateSecuredMode()})
0206: * * @throws IOException If some other I/O error occurs
0207: */
0208: public BlockingConnection(String hostname, int port,
0209: SSLContext sslContext, boolean sslOn) throws IOException {
0210: this (new InetSocketAddress(hostname, port), Integer.MAX_VALUE,
0211: new HashMap<String, Object>(), sslContext, sslOn);
0212: }
0213:
0214: /**
0215: * intermediate constructor
0216: *
0217: */
0218: private BlockingConnection(InetSocketAddress remoteAddress,
0219: int connectTimeoutMillis, Map<String, Object> options,
0220: SSLContext sslContext, boolean sslOn) throws IOException {
0221: setUnderlyingConnection(new NonBlockingConnection(
0222: remoteAddress, connectTimeoutMillis, options,
0223: sslContext, sslOn, handler,
0224: new SingleThreadedWorkerPool()));
0225: }
0226:
0227: /**
0228: * constructor
0229: *
0230: * @param delegee the underlying non blocking connection
0231: * @throws IOException If some other I/O error occurs
0232: */
0233: public BlockingConnection(INonBlockingConnection delegee)
0234: throws IOException {
0235: setUnderlyingConnection(delegee);
0236: delegee.setHandler(handler);
0237: }
0238:
0239: private void setUnderlyingConnection(INonBlockingConnection delegee) {
0240: this .delegee = delegee;
0241: }
0242:
0243: final INonBlockingConnection getDelegee() {
0244: return delegee;
0245: }
0246:
0247: /**
0248: * {@inheritDoc}
0249: */
0250: public void setReceiveTimeoutMillis(int timeout) throws IOException {
0251: this .receiveTimeout = timeout;
0252:
0253: int soTimeout = (Integer) delegee.getOption(SO_TIMEOUT);
0254: if (timeout > soTimeout) {
0255: delegee.setOption(SO_TIMEOUT, timeout);
0256: }
0257: }
0258:
0259: /**
0260: * {@inheritDoc}
0261: */
0262: public final int getReceiveTimeoutMillis() throws IOException {
0263: return receiveTimeout;
0264: }
0265:
0266: /**
0267: * {@inheritDoc}
0268: */
0269: public final void setEncoding(String defaultEncoding) {
0270: delegee.setEncoding(defaultEncoding);
0271: }
0272:
0273: /**
0274: * {@inheritDoc}
0275: */
0276: public final String getEncoding() {
0277: return delegee.getEncoding();
0278: }
0279:
0280: /**
0281: * return if the data source is open. Default is true
0282: * @return true, if the data source is open
0283: */
0284: public final boolean isOpen() {
0285: return delegee.isOpen();
0286: }
0287:
0288: /**
0289: * {@inheritDoc}
0290: */
0291: public final void close() throws IOException {
0292: delegee.close();
0293: }
0294:
0295: /**
0296: * {@inheritDoc}
0297: */
0298: public final void flush() throws ClosedChannelException,
0299: IOException, SocketTimeoutException {
0300: delegee.flush();
0301: }
0302:
0303: /**
0304: * {@inheritDoc}
0305: */
0306:
0307: public String getId() {
0308: return delegee.getId();
0309: }
0310:
0311: /**
0312: * {@inheritDoc}
0313: */
0314: public final InetAddress getRemoteAddress() {
0315: return delegee.getRemoteAddress();
0316: }
0317:
0318: /**
0319: * {@inheritDoc}
0320: */
0321: public final int getRemotePort() {
0322: return delegee.getRemotePort();
0323: }
0324:
0325: /**
0326: * {@inheritDoc}
0327: */
0328: public final InetAddress getLocalAddress() {
0329: return delegee.getLocalAddress();
0330: }
0331:
0332: /**
0333: * {@inheritDoc}
0334: */
0335: public final int getLocalPort() {
0336: return delegee.getLocalPort();
0337: }
0338:
0339: /**
0340: * {@inheritDoc}
0341: */
0342: public final int getPendingWriteDataSize() {
0343: return delegee.getPendingWriteDataSize();
0344: }
0345:
0346: /**
0347: * {@inheritDoc}
0348: */
0349: public final void suspendRead() throws IOException {
0350: delegee.suspendRead();
0351: }
0352:
0353: /**
0354: * {@inheritDoc}
0355: */
0356: public final void resumeRead() throws IOException {
0357: delegee.resumeRead();
0358: }
0359:
0360: /**
0361: * {@inheritDoc}
0362: */
0363: public void setFlushmode(FlushMode flushMode) {
0364: delegee.setFlushmode(flushMode);
0365: }
0366:
0367: /**
0368: * {@inheritDoc}
0369: */
0370: public FlushMode getFlushmode() {
0371: return delegee.getFlushmode();
0372: }
0373:
0374: /**
0375: * {@inheritDoc}
0376: */
0377: public final void setOption(String name, Object value)
0378: throws IOException {
0379: delegee.setOption(name, value);
0380: }
0381:
0382: /**
0383: * {@inheritDoc}
0384: */
0385: public final Object getOption(String name) throws IOException {
0386: return delegee.getOption(name);
0387: }
0388:
0389: /**
0390: * {@inheritDoc}
0391: */
0392: @SuppressWarnings("unchecked")
0393: public final Map<String, Class> getOptions() {
0394: return delegee.getOptions();
0395: }
0396:
0397: /**
0398: * {@inheritDoc}
0399: */
0400: public final void setIdleTimeoutMillis(long timeoutInMillis) {
0401: delegee.setIdleTimeoutMillis(timeoutInMillis);
0402: }
0403:
0404: /**
0405: * {@inheritDoc}
0406: */
0407: public final long getIdleTimeoutMillis() {
0408: return delegee.getIdleTimeoutMillis();
0409: }
0410:
0411: /**
0412: * {@inheritDoc}
0413: */
0414: public final void setConnectionTimeoutMillis(long timeoutMillis) {
0415: delegee.setConnectionTimeoutMillis(timeoutMillis);
0416: }
0417:
0418: /**
0419: * {@inheritDoc}
0420: */
0421: public final long getConnectionTimeoutMillis() {
0422: return delegee.getConnectionTimeoutMillis();
0423: }
0424:
0425: /**
0426: * {@inheritDoc}
0427: */
0428: public long getRemainingMillisToConnectionTimeout() {
0429: return delegee.getRemainingMillisToConnectionTimeout();
0430: }
0431:
0432: /**
0433: * {@inheritDoc}
0434: */
0435: public long getRemainingMillisToIdleTimeout() {
0436: return delegee.getRemainingMillisToIdleTimeout();
0437: }
0438:
0439: /**
0440: * {@inheritDoc}
0441: */
0442: public final void setAttachment(Object obj) {
0443: delegee.setAttachment(obj);
0444: }
0445:
0446: /**
0447: * {@inheritDoc}
0448: */
0449: public final Object getAttachment() {
0450: return delegee.getAttachment();
0451: }
0452:
0453: /**
0454: * {@inheritDoc}
0455: */
0456: public final void setAutoflush(boolean autoflush) {
0457: delegee.setAutoflush(autoflush);
0458: }
0459:
0460: /**
0461: * {@inheritDoc}
0462: */
0463: public final boolean isAutoflush() {
0464: return delegee.isAutoflush();
0465: }
0466:
0467: /**
0468: * {@inheritDoc}
0469: */
0470: public final void activateSecuredMode() throws IOException {
0471: delegee.activateSecuredMode();
0472: }
0473:
0474: /**
0475: * {@inheritDoc}
0476: */
0477: public boolean isSecure() {
0478: return delegee.isSecure();
0479: }
0480:
0481: /**
0482: * {@inheritDoc}
0483: */
0484: public final void markReadPosition() {
0485: delegee.markReadPosition();
0486: }
0487:
0488: /**
0489: * {@inheritDoc}
0490: */
0491: public final void markWritePosition() {
0492: delegee.markWritePosition();
0493: }
0494:
0495: /**
0496: * {@inheritDoc}.
0497: */
0498: public final int read(ByteBuffer buffer) throws IOException,
0499: ClosedChannelException {
0500: int size = buffer.remaining();
0501: if (size < 1) {
0502: return 0;
0503: }
0504:
0505: long start = System.currentTimeMillis();
0506: long remainingTime = receiveTimeout;
0507:
0508: synchronized (readGuard) {
0509: do {
0510: int availableSize = delegee.available();
0511:
0512: // if at least one byte is available -> read and return
0513: if (availableSize > 0) {
0514: int read = delegee.read(buffer);
0515: if (read > 0) {
0516: return read;
0517: }
0518: }
0519:
0520: if (availableSize == -1) {
0521: // check if channel is closed by reading with length 0
0522: // is closed a ClosedChannelException will be thrown
0523: delegee.read(ByteBuffer.allocate(0));
0524: }
0525:
0526: // no data available
0527: if (isOpen()) {
0528: waitForData(readGuard, remainingTime);
0529:
0530: } else {
0531: return -1;
0532: }
0533:
0534: remainingTime = (start + receiveTimeout)
0535: - System.currentTimeMillis();
0536: } while (remainingTime > 0);
0537: }
0538:
0539: if (LOG.isLoggable(Level.FINE)) {
0540: LOG.fine("receive timeout "
0541: + DataConverter.toFormatedDuration(receiveTimeout)
0542: + " reached. throwsing timeout exception");
0543: }
0544:
0545: throw new SocketTimeoutException("timeout "
0546: + DataConverter.toFormatedDuration(receiveTimeout)
0547: + " reached");
0548: }
0549:
0550: private void waitForData(Object readGuard, long maxWaittime) {
0551: try {
0552: readGuard.wait(maxWaittime);
0553: } catch (InterruptedException ignore) {
0554: }
0555: }
0556:
0557: /**
0558: * {@inheritDoc}
0559: */
0560: public final byte readByte() throws IOException,
0561: SocketTimeoutException {
0562: long start = System.currentTimeMillis();
0563: long remainingTime = receiveTimeout;
0564:
0565: do {
0566: synchronized (readGuard) {
0567: try {
0568: return delegee.readByte();
0569: } catch (BufferUnderflowException bue) {
0570: if (isOpen()) {
0571: waitForData(readGuard, remainingTime);
0572:
0573: } else {
0574: throw new ClosedChannelException();
0575: }
0576: }
0577: }
0578:
0579: remainingTime = (start + receiveTimeout)
0580: - System.currentTimeMillis();
0581: } while (remainingTime > 0);
0582:
0583: if (LOG.isLoggable(Level.FINE)) {
0584: LOG.fine("receive timeout "
0585: + DataConverter.toFormatedDuration(receiveTimeout)
0586: + " reached. throwsing timeout exception");
0587: }
0588:
0589: throw new SocketTimeoutException("timeout "
0590: + DataConverter.toFormatedDuration(receiveTimeout)
0591: + " reached");
0592: }
0593:
0594: /**
0595: * {@inheritDoc}
0596: */
0597: public final short readShort() throws IOException,
0598: SocketTimeoutException {
0599: long start = System.currentTimeMillis();
0600: long remainingTime = receiveTimeout;
0601:
0602: do {
0603: synchronized (readGuard) {
0604: try {
0605: return delegee.readShort();
0606: } catch (BufferUnderflowException bue) {
0607: if (isOpen()) {
0608: waitForData(readGuard, remainingTime);
0609: } else {
0610: throw new ClosedChannelException();
0611: }
0612: }
0613: }
0614:
0615: remainingTime = (start + receiveTimeout)
0616: - System.currentTimeMillis();
0617: } while (remainingTime > 0);
0618:
0619: if (LOG.isLoggable(Level.FINE)) {
0620: LOG.fine("receive timeout "
0621: + DataConverter.toFormatedDuration(receiveTimeout)
0622: + " reached. throwsing timeout exception");
0623: }
0624:
0625: throw new SocketTimeoutException("timeout "
0626: + DataConverter.toFormatedDuration(receiveTimeout)
0627: + " reached");
0628: }
0629:
0630: /**
0631: * {@inheritDoc}
0632: */
0633: public final int readInt() throws IOException,
0634: SocketTimeoutException {
0635: long start = System.currentTimeMillis();
0636: long remainingTime = receiveTimeout;
0637:
0638: do {
0639: synchronized (readGuard) {
0640: try {
0641: return delegee.readInt();
0642: } catch (BufferUnderflowException bue) {
0643: if (isOpen()) {
0644: waitForData(readGuard, remainingTime);
0645: } else {
0646: throw new ClosedChannelException();
0647: }
0648: }
0649: }
0650:
0651: remainingTime = (start + receiveTimeout)
0652: - System.currentTimeMillis();
0653: } while (remainingTime > 0);
0654:
0655: if (LOG.isLoggable(Level.FINE)) {
0656: LOG.fine("receive timeout "
0657: + DataConverter.toFormatedDuration(receiveTimeout)
0658: + " reached. throwsing timeout exception");
0659: }
0660:
0661: throw new SocketTimeoutException("timeout "
0662: + DataConverter.toFormatedDuration(receiveTimeout)
0663: + " reached");
0664: }
0665:
0666: /**
0667: * {@inheritDoc}
0668: */
0669: public final long readLong() throws IOException,
0670: SocketTimeoutException {
0671: long start = System.currentTimeMillis();
0672: long remainingTime = receiveTimeout;
0673:
0674: do {
0675: synchronized (readGuard) {
0676: try {
0677: return delegee.readLong();
0678: } catch (BufferUnderflowException bue) {
0679: if (isOpen()) {
0680: waitForData(readGuard, remainingTime);
0681: } else {
0682: throw new ClosedChannelException();
0683: }
0684: }
0685: }
0686:
0687: remainingTime = (start + receiveTimeout)
0688: - System.currentTimeMillis();
0689: } while (remainingTime > 0);
0690:
0691: if (LOG.isLoggable(Level.FINE)) {
0692: LOG.fine("receive timeout "
0693: + DataConverter.toFormatedDuration(receiveTimeout)
0694: + " reached. throwsing timeout exception");
0695: }
0696:
0697: throw new SocketTimeoutException("timeout "
0698: + DataConverter.toFormatedDuration(receiveTimeout)
0699: + " reached");
0700: }
0701:
0702: /**
0703: * {@inheritDoc}
0704: */
0705: public final double readDouble() throws IOException,
0706: SocketTimeoutException {
0707: long start = System.currentTimeMillis();
0708: long remainingTime = receiveTimeout;
0709:
0710: do {
0711: synchronized (readGuard) {
0712: try {
0713: return delegee.readDouble();
0714: } catch (BufferUnderflowException bue) {
0715: if (isOpen()) {
0716: waitForData(readGuard, remainingTime);
0717: } else {
0718: throw new ClosedChannelException();
0719: }
0720: }
0721: }
0722:
0723: remainingTime = (start + receiveTimeout)
0724: - System.currentTimeMillis();
0725: } while (remainingTime > 0);
0726:
0727: if (LOG.isLoggable(Level.FINE)) {
0728: LOG.fine("receive timeout "
0729: + DataConverter.toFormatedDuration(receiveTimeout)
0730: + " reached. throwsing timeout exception");
0731: }
0732:
0733: throw new SocketTimeoutException("timeout "
0734: + DataConverter.toFormatedDuration(receiveTimeout)
0735: + " reached");
0736: }
0737:
0738: /**
0739: * {@inheritDoc}
0740: */
0741: public final ByteBuffer[] readByteBufferByDelimiter(String delimiter)
0742: throws IOException, SocketTimeoutException {
0743: return readByteBufferByDelimiter(delimiter, getEncoding());
0744: }
0745:
0746: /**
0747: * {@inheritDoc}
0748: */
0749: public final ByteBuffer[] readByteBufferByDelimiter(
0750: String delimiter, int maxLength) throws IOException,
0751: MaxReadSizeExceededException, SocketTimeoutException {
0752: return readByteBufferByDelimiter(delimiter, getEncoding(),
0753: maxLength);
0754: }
0755:
0756: /**
0757: * {@inheritDoc}
0758: */
0759: public final ByteBuffer[] readByteBufferByDelimiter(
0760: String delimiter, String encoding) throws IOException,
0761: SocketTimeoutException {
0762: return readByteBufferByDelimiter(delimiter, encoding,
0763: Integer.MAX_VALUE);
0764: }
0765:
0766: /**
0767: * {@inheritDoc}
0768: */
0769: public final ByteBuffer[] readByteBufferByDelimiter(
0770: String delimiter, String encoding, int maxLength)
0771: throws IOException, MaxReadSizeExceededException,
0772: SocketTimeoutException {
0773:
0774: long start = System.currentTimeMillis();
0775: long remainingTime = receiveTimeout;
0776:
0777: do {
0778: synchronized (readGuard) {
0779: try {
0780: return delegee.readByteBufferByDelimiter(delimiter,
0781: encoding, maxLength);
0782:
0783: } catch (MaxReadSizeExceededException mre) {
0784: throw mre;
0785:
0786: } catch (BufferUnderflowException bue) {
0787: if (isOpen()) {
0788: waitForData(readGuard, remainingTime);
0789: } else {
0790: throw new ClosedChannelException();
0791: }
0792: }
0793: }
0794:
0795: remainingTime = (start + receiveTimeout)
0796: - System.currentTimeMillis();
0797: } while (remainingTime > 0);
0798:
0799: if (LOG.isLoggable(Level.FINE)) {
0800: LOG.fine("receive timeout "
0801: + DataConverter.toFormatedDuration(receiveTimeout)
0802: + " reached. throwsing timeout exception");
0803: }
0804:
0805: throw new SocketTimeoutException("timeout "
0806: + DataConverter.toFormatedDuration(receiveTimeout)
0807: + " reached");
0808: }
0809:
0810: /**
0811: * {@inheritDoc}
0812: */
0813: public final ByteBuffer[] readByteBufferByLength(int length)
0814: throws IOException, SocketTimeoutException {
0815: if (length <= 0) {
0816: return null;
0817: }
0818:
0819: long start = System.currentTimeMillis();
0820: long remainingTime = receiveTimeout;
0821:
0822: do {
0823: synchronized (readGuard) {
0824: try {
0825: return delegee.readByteBufferByLength(length);
0826: } catch (BufferUnderflowException bue) {
0827: if (isOpen()) {
0828: waitForData(readGuard, remainingTime);
0829: } else {
0830: throw new ClosedChannelException();
0831: }
0832: }
0833: }
0834:
0835: remainingTime = (start + receiveTimeout)
0836: - System.currentTimeMillis();
0837: } while (remainingTime > 0);
0838:
0839: if (LOG.isLoggable(Level.FINE)) {
0840: LOG.fine("receive timeout "
0841: + DataConverter.toFormatedDuration(receiveTimeout)
0842: + " reached. throwsing timeout exception");
0843: }
0844:
0845: throw new SocketTimeoutException("timeout "
0846: + DataConverter.toFormatedDuration(receiveTimeout)
0847: + " reached");
0848: }
0849:
0850: /**
0851: * {@inheritDoc}
0852: */
0853: public final byte[] readBytesByDelimiter(String delimiter)
0854: throws IOException, SocketTimeoutException {
0855: return readBytesByDelimiter(delimiter, getEncoding());
0856: }
0857:
0858: /**
0859: * {@inheritDoc}
0860: */
0861: public final byte[] readBytesByDelimiter(String delimiter,
0862: int maxLength) throws IOException,
0863: MaxReadSizeExceededException, SocketTimeoutException {
0864: return readBytesByDelimiter(delimiter, getEncoding(), maxLength);
0865: }
0866:
0867: /**
0868: * {@inheritDoc}
0869: */
0870: public final byte[] readBytesByDelimiter(String delimiter,
0871: String encoding) throws IOException, SocketTimeoutException {
0872: return readBytesByDelimiter(delimiter, encoding,
0873: Integer.MAX_VALUE);
0874: }
0875:
0876: /**
0877: * {@inheritDoc}
0878: */
0879: public final byte[] readBytesByDelimiter(String delimiter,
0880: String encoding, int maxLength) throws IOException,
0881: MaxReadSizeExceededException, SocketTimeoutException {
0882: return DataConverter.toBytes(readByteBufferByDelimiter(
0883: delimiter, encoding, maxLength));
0884: }
0885:
0886: /**
0887: * {@inheritDoc}
0888: */
0889: public final byte[] readBytesByLength(int length)
0890: throws IOException, SocketTimeoutException {
0891: return DataConverter.toBytes(readByteBufferByLength(length));
0892: }
0893:
0894: /**
0895: * {@inheritDoc}
0896: */
0897: public final String readStringByDelimiter(String delimiter)
0898: throws IOException, UnsupportedEncodingException,
0899: SocketTimeoutException {
0900: return readStringByDelimiter(delimiter, Integer.MAX_VALUE);
0901: }
0902:
0903: /**
0904: * {@inheritDoc}
0905: */
0906: public final String readStringByDelimiter(String delimiter,
0907: int maxLength) throws IOException,
0908: UnsupportedEncodingException, MaxReadSizeExceededException,
0909: SocketTimeoutException {
0910: return readStringByDelimiter(delimiter, getEncoding(),
0911: maxLength);
0912: }
0913:
0914: /**
0915: * {@inheritDoc}
0916: */
0917: public final String readStringByDelimiter(String delimiter,
0918: String encoding) throws IOException,
0919: UnsupportedEncodingException, MaxReadSizeExceededException,
0920: SocketTimeoutException {
0921: return readStringByDelimiter(delimiter, encoding,
0922: Integer.MAX_VALUE);
0923: }
0924:
0925: /**
0926: * {@inheritDoc}
0927: */
0928: public final String readStringByDelimiter(String delimiter,
0929: String encoding, int maxLength) throws IOException,
0930: UnsupportedEncodingException, MaxReadSizeExceededException,
0931: SocketTimeoutException {
0932: return DataConverter.toString(readByteBufferByDelimiter(
0933: delimiter, encoding, maxLength), encoding);
0934: }
0935:
0936: /**
0937: * {@inheritDoc}
0938: */
0939: public final String readStringByLength(int length)
0940: throws IOException, UnsupportedEncodingException,
0941: SocketTimeoutException {
0942: return readStringByLength(length, getEncoding());
0943: }
0944:
0945: /**
0946: * {@inheritDoc}
0947: */
0948: public final String readStringByLength(int length, String encoding)
0949: throws IOException, UnsupportedEncodingException,
0950: SocketTimeoutException {
0951: return DataConverter.toString(readByteBufferByLength(length),
0952: encoding);
0953: }
0954:
0955: /**
0956: * {@inheritDoc}
0957: */
0958: public final long transferTo(WritableByteChannel target, int length)
0959: throws IOException, SocketTimeoutException {
0960: long written = 0;
0961:
0962: ByteBuffer[] buffers = readByteBufferByLength(length);
0963: for (ByteBuffer buffer : buffers) {
0964: written += target.write(buffer);
0965: }
0966:
0967: return written;
0968: }
0969:
0970: /**
0971: * {@inheritDoc}
0972: */
0973: public final boolean resetToWriteMark() {
0974: return delegee.resetToWriteMark();
0975: }
0976:
0977: /**
0978: * {@inheritDoc}
0979: */
0980: public final boolean resetToReadMark() {
0981: return delegee.resetToReadMark();
0982: }
0983:
0984: /**
0985: * {@inheritDoc}
0986: */
0987: public final void removeReadMark() {
0988: delegee.removeReadMark();
0989: }
0990:
0991: /**
0992: * {@inheritDoc}
0993: */
0994: public final void removeWriteMark() {
0995: delegee.removeWriteMark();
0996: }
0997:
0998: /**
0999: * {@inheritDoc}
1000: */
1001: public final int write(byte b) throws IOException,
1002: BufferOverflowException {
1003: return delegee.write(b);
1004: }
1005:
1006: /**
1007: * {@inheritDoc}
1008: */
1009: public final int write(byte... bytes) throws IOException {
1010: return delegee.write(bytes);
1011: }
1012:
1013: /**
1014: * {@inheritDoc}
1015: */
1016: public final int write(byte[] bytes, int offset, int length)
1017: throws IOException {
1018: return delegee.write(bytes, offset, length);
1019: }
1020:
1021: /**
1022: * {@inheritDoc}
1023: */
1024: public final int write(short s) throws IOException {
1025: return delegee.write(s);
1026: }
1027:
1028: /**
1029: * {@inheritDoc}
1030: */
1031: public final int write(int i) throws IOException {
1032: return delegee.write(i);
1033: }
1034:
1035: /**
1036: * {@inheritDoc}
1037: */
1038: public final int write(long l) throws IOException {
1039: return delegee.write(l);
1040: }
1041:
1042: /**
1043: * {@inheritDoc}
1044: */
1045: public final int write(double d) throws IOException {
1046: return delegee.write(d);
1047: }
1048:
1049: /**
1050: * {@inheritDoc}
1051: */
1052: public final int write(String message) throws IOException {
1053: return delegee.write(message);
1054: }
1055:
1056: /**
1057: * {@inheritDoc}
1058: */
1059: public final int write(String message, String encoding)
1060: throws IOException {
1061: return delegee.write(message, encoding);
1062: }
1063:
1064: /**
1065: * {@inheritDoc}
1066: */
1067: public final long write(ArrayList<ByteBuffer> buffers)
1068: throws IOException {
1069: return delegee.write(buffers);
1070: }
1071:
1072: /**
1073: * {@inheritDoc}
1074: */
1075: public final long write(List<ByteBuffer> buffers)
1076: throws IOException {
1077: return delegee.write(buffers);
1078: }
1079:
1080: /**
1081: * {@inheritDoc}
1082: */
1083: public final long write(ByteBuffer[] buffers) throws IOException {
1084: return delegee.write(buffers);
1085: }
1086:
1087: /**
1088: * {@inheritDoc}
1089: */
1090: public long write(ByteBuffer[] srcs, int offset, int length)
1091: throws IOException {
1092: return delegee.write(srcs, offset, length);
1093: }
1094:
1095: /**
1096: * {@inheritDoc}
1097: */
1098: public final int write(ByteBuffer buffer) throws IOException {
1099: return delegee.write(buffer);
1100: }
1101:
1102: /**
1103: * {@inheritDoc}
1104: */
1105: public final long transferFrom(ReadableByteChannel source)
1106: throws IOException {
1107: return delegee.transferFrom(source);
1108: }
1109:
1110: /**
1111: * {@inheritDoc}
1112: */
1113: public final long transferFrom(ReadableByteChannel source,
1114: int chunkSize) throws IOException {
1115: return delegee.transferFrom(source, chunkSize);
1116: }
1117:
1118: private void onReadDataInserted() {
1119: synchronized (readGuard) {
1120: readGuard.notifyAll();
1121: }
1122: }
1123:
1124: public long transferFrom(FileChannel source) throws IOException {
1125: return delegee.transferFrom(source);
1126: }
1127:
1128: private static final class SingleThreadedWorkerPool implements
1129: Executor {
1130: public void execute(Runnable command) {
1131: command.run();
1132: }
1133: }
1134:
1135: @Override
1136: public String toString() {
1137: return delegee.toString();
1138: }
1139:
1140: @Execution(Execution.NONTHREADED)
1141: private final class ReadNotificationHandler implements
1142: IInternalHandler {
1143:
1144: public boolean onConnect(INonBlockingConnection connection)
1145: throws IOException, BufferUnderflowException,
1146: MaxReadSizeExceededException {
1147: return true;
1148: }
1149:
1150: public boolean onData(INonBlockingConnection connection)
1151: throws IOException, BufferUnderflowException,
1152: MaxReadSizeExceededException {
1153: onReadDataInserted();
1154: return true;
1155: }
1156:
1157: public boolean onDisconnect(INonBlockingConnection connection)
1158: throws IOException {
1159: onReadDataInserted();
1160: return true;
1161: }
1162:
1163: public boolean onConnectionTimeout(
1164: INonBlockingConnection connection) throws IOException {
1165: onReadDataInserted();
1166:
1167: connection.close();
1168: return true;
1169: }
1170:
1171: public boolean onIdleTimeout(INonBlockingConnection connection)
1172: throws IOException {
1173: onReadDataInserted();
1174:
1175: connection.close();
1176: return true;
1177: }
1178: }
1179: }
|