0001: /*
0002: * Copyright (c) xsocket.org, 2006 - 2008. All rights reserved.
0003: *
0004: * This library is free software; you can redistribute it and/or
0005: * modify it under the terms of the GNU Lesser General Public
0006: * License as published by the Free Software Foundation; either
0007: * version 2.1 of the License, or (at your option) any later version.
0008: *
0009: * This library is distributed in the hope that it will be useful,
0010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
0011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
0012: * Lesser General Public License for more details.
0013: *
0014: * You should have received a copy of the GNU Lesser General Public
0015: * License along with this library; if not, write to the Free Software
0016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0017: *
0018: * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
0019: * The latest copy of this software may be found on http://www.xsocket.org/
0020: */
0021: package org.xsocket.connection.spi;
0022:
0023: import java.io.IOException;
0024: import java.net.InetAddress;
0025: import java.net.SocketTimeoutException;
0026: import java.nio.ByteBuffer;
0027: import java.nio.channels.ClosedChannelException;
0028: import java.nio.channels.SelectionKey;
0029: import java.nio.channels.SocketChannel;
0030: import java.util.Collections;
0031: import java.util.HashMap;
0032: import java.util.Map;
0033: import java.util.logging.Level;
0034: import java.util.logging.Logger;
0035:
0036: import org.xsocket.IDispatcher;
0037: import org.xsocket.IHandle;
0038: import org.xsocket.DataConverter;
0039:
0040: /**
0041: * Socket based io handler
0042: *
0043: * @author grro@xsocket.org
0044: */
0045: final class IoSocketHandler extends ChainableIoHandler implements
0046: IHandle {
0047:
0048: private static final Logger LOG = Logger
0049: .getLogger(IoSocketHandler.class.getName());
0050:
0051: private static final int MAXSIZE_LOG_READ = 2000;
0052:
0053: @SuppressWarnings("unchecked")
0054: private static final Map<String, Class> SUPPORTED_OPTIONS = new HashMap<String, Class>();
0055:
0056: static {
0057: SUPPORTED_OPTIONS.put(IClientIoProvider.SO_RCVBUF,
0058: Integer.class);
0059: SUPPORTED_OPTIONS.put(IClientIoProvider.SO_SNDBUF,
0060: Integer.class);
0061: SUPPORTED_OPTIONS.put(IClientIoProvider.SO_REUSEADDR,
0062: Boolean.class);
0063: SUPPORTED_OPTIONS.put(IClientIoProvider.SO_KEEPALIVE,
0064: Boolean.class);
0065: SUPPORTED_OPTIONS.put(IClientIoProvider.TCP_NODELAY,
0066: Boolean.class);
0067: SUPPORTED_OPTIONS.put(IClientIoProvider.SO_LINGER,
0068: Integer.class);
0069: }
0070:
0071: // flag
0072: private boolean isLogicalOpen = true;
0073: private boolean isDisconnect = false;
0074:
0075: // socket
0076: private SocketChannel channel = null;
0077:
0078: // dispatcher
0079: private IoSocketDispatcher dispatcher = null;
0080:
0081: // memory management
0082: private IMemoryManager memoryManager = null;
0083:
0084: // receive & send queue
0085: private final IoQueue sendQueue = new IoQueue();
0086:
0087: // id
0088: private String id = null;
0089:
0090: // retry read
0091: private boolean isRetryRead = true;
0092:
0093: // timeouts
0094: private long idleTimeoutMillis = IClientIoProvider.DEFAULT_IDLE_TIMEOUT_MILLIS;
0095: private long idleTimeoutDateMillis = Long.MAX_VALUE;
0096: private long connectionTimeoutMillis = IClientIoProvider.DEFAULT_CONNECTION_TIMEOUT_MILLIS;
0097: private long connectionTimeoutDateMillis = Long.MAX_VALUE;
0098:
0099: // suspend flag
0100: private boolean suspendRead = false;
0101:
0102: // socket param
0103: private int soRcvbuf = 0;
0104:
0105: // statistics
0106: private long openTime = -1;
0107: private long lastTimeReceivedMillis = System.currentTimeMillis();
0108: // private long lastTimeSent = System.currentTimeMillis();
0109: private long receivedBytes = 0;
0110: private long sendBytes = 0;
0111:
0112: /**
0113: * constructor
0114: *
0115: * @param channel the underlying channel
0116: * @param idLocalPrefix the id namespace prefix
0117: * @param dispatcher the dispatcher
0118: * @throws IOException If some other I/O error occurs
0119: */
0120: @SuppressWarnings("unchecked")
0121: IoSocketHandler(SocketChannel channel,
0122: IoSocketDispatcher dispatcher, String connectionId)
0123: throws IOException {
0124: super (null);
0125:
0126: assert (channel != null);
0127: this .channel = channel;
0128:
0129: openTime = System.currentTimeMillis();
0130:
0131: channel.configureBlocking(false);
0132:
0133: this .dispatcher = dispatcher;
0134: this .id = connectionId;
0135:
0136: soRcvbuf = (Integer) getOption(DefaultIoProvider.SO_RCVBUF);
0137: }
0138:
0139: public void init(IIoHandlerCallback callbackHandler)
0140: throws IOException, SocketTimeoutException {
0141: setPreviousCallback(callbackHandler);
0142:
0143: blockUntilIsConnected();
0144: dispatcher.register(this , SelectionKey.OP_READ);
0145: }
0146:
0147: void setRetryRead(boolean isRetryRead) {
0148: this .isRetryRead = isRetryRead;
0149: }
0150:
0151: /**
0152: * {@inheritDoc}
0153: */
0154: public boolean reset() {
0155: try {
0156: sendQueue.drain();
0157: resumeRead();
0158:
0159: return super .reset();
0160: } catch (Exception e) {
0161: return false;
0162: }
0163: }
0164:
0165: void setMemoryManager(IMemoryManager memoryManager) {
0166: this .memoryManager = memoryManager;
0167: }
0168:
0169: @Override
0170: public String getId() {
0171: return id;
0172: }
0173:
0174: /**
0175: * {@inheritDoc}
0176: */
0177: @Override
0178: public int getPendingWriteDataSize() {
0179: return sendQueue.getSize() + super .getPendingWriteDataSize();
0180: }
0181:
0182: /**
0183: * {@inheritDoc}
0184: */
0185: @Override
0186: public boolean hasDataToSend() {
0187: return !sendQueue.isEmpty();
0188: }
0189:
0190: /**
0191: * {@inheritDoc}
0192: */
0193: public void setOption(String name, Object value) throws IOException {
0194: DefaultIoProvider.setOption(channel.socket(), name, value);
0195:
0196: if (name.equals(DefaultIoProvider.SO_RCVBUF)) {
0197: soRcvbuf = (Integer) value;
0198: }
0199: }
0200:
0201: /**
0202: * {@inheritDoc}
0203: */
0204: public Object getOption(String name) throws IOException {
0205: return DefaultIoProvider.getOption(channel.socket(), name);
0206: }
0207:
0208: /**
0209: * {@inheritDoc}
0210: */
0211: @SuppressWarnings("unchecked")
0212: public Map<String, Class> getOptions() {
0213: return Collections.unmodifiableMap(SUPPORTED_OPTIONS);
0214: }
0215:
0216: /**
0217: * {@inheritDoc}
0218: */
0219: public void setIdleTimeoutMillis(long timeoutMillis) {
0220: if (timeoutMillis <= 0) {
0221: LOG.warning("connection timeout " + timeoutMillis
0222: + " millis is invalid");
0223: return;
0224: }
0225:
0226: this .idleTimeoutMillis = timeoutMillis;
0227: this .idleTimeoutDateMillis = System.currentTimeMillis()
0228: + idleTimeoutMillis;
0229:
0230: if (idleTimeoutDateMillis < 0) {
0231: idleTimeoutDateMillis = Long.MAX_VALUE;
0232: }
0233:
0234: long period = idleTimeoutMillis;
0235: if (idleTimeoutMillis > 500) {
0236: period = idleTimeoutMillis / 5;
0237: }
0238:
0239: dispatcher.updateTimeoutCheckPeriod(period);
0240: }
0241:
0242: /**
0243: * sets the connection timeout
0244: *
0245: * @param timeout the connection timeout
0246: */
0247: public void setConnectionTimeoutMillis(long timeoutMillis) {
0248:
0249: if (timeoutMillis <= 0) {
0250: LOG.warning("connection timeout " + timeoutMillis
0251: + " millis is invalid");
0252: return;
0253: }
0254:
0255: this .connectionTimeoutMillis = timeoutMillis;
0256: this .connectionTimeoutDateMillis = System.currentTimeMillis()
0257: + connectionTimeoutMillis;
0258:
0259: long period = connectionTimeoutMillis;
0260: if (connectionTimeoutMillis > 500) {
0261: period = connectionTimeoutMillis / 5;
0262: }
0263:
0264: dispatcher.updateTimeoutCheckPeriod(period);
0265: }
0266:
0267: /**
0268: * gets the connection timeout
0269: *
0270: * @return the connection timeout
0271: */
0272: public long getConnectionTimeoutMillis() {
0273: return connectionTimeoutMillis;
0274: }
0275:
0276: /**
0277: * {@inheritDoc}
0278: */
0279: public long getIdleTimeoutMillis() {
0280: return idleTimeoutMillis;
0281: }
0282:
0283: /**
0284: * check the timeout
0285: *
0286: * @param currentMillis the current time
0287: * @return true, if the connection has been timed out
0288: */
0289: boolean checkIdleTimeout(Long currentMillis) {
0290: if (getRemainingMillisToIdleTimeout(currentMillis) <= 0) {
0291: getPreviousCallback().onIdleTimeout();
0292: return true;
0293: }
0294: return false;
0295: }
0296:
0297: /**
0298: * {@inheritDoc}
0299: */
0300: public long getRemainingMillisToIdleTimeout() {
0301: return getRemainingMillisToIdleTimeout(System
0302: .currentTimeMillis());
0303: }
0304:
0305: private long getRemainingMillisToIdleTimeout(long currentMillis) {
0306: long remaining = idleTimeoutDateMillis - currentMillis;
0307:
0308: // time out received
0309: if (remaining > 0) {
0310: return remaining;
0311:
0312: // ... yes
0313: } else {
0314:
0315: // ... but check if meantime data has been received!
0316: return (lastTimeReceivedMillis + idleTimeoutMillis)
0317: - currentMillis;
0318: }
0319: }
0320:
0321: /**
0322: * check if the underlying connection is timed out
0323: *
0324: * @param currentMillis the current time
0325: * @return true, if the connection has been timed out
0326: */
0327: boolean checkConnectionTimeout(Long currentMillis) {
0328: if (getRemainingMillisToConnectionTimeout(currentMillis) <= 0) {
0329: getPreviousCallback().onConnectionTimeout();
0330: return true;
0331: }
0332: return false;
0333: }
0334:
0335: /**
0336: * {@inheritDoc}
0337: */
0338: public long getRemainingMillisToConnectionTimeout() {
0339: return getRemainingMillisToConnectionTimeout(System
0340: .currentTimeMillis());
0341: }
0342:
0343: private long getRemainingMillisToConnectionTimeout(
0344: long currentMillis) {
0345: return connectionTimeoutDateMillis - currentMillis;
0346: }
0347:
0348: /**
0349: * check if the underyling connection is timed out
0350: *
0351: * @param current the current time
0352: * @return true, if the connection has been timed out
0353: */
0354: void checkConnection() {
0355: if (!channel.isOpen()) {
0356: getPreviousCallback().onConnectionAbnormalTerminated();
0357: }
0358: }
0359:
0360: void onConnectEvent() throws IOException {
0361: getPreviousCallback().onConnect();
0362: }
0363:
0364: int onReadableEvent() throws IOException {
0365: assert (IoSocketDispatcher.isDispatcherThread()) : "receiveQueue can only be accessed by the dispatcher thread";
0366:
0367: int read = 0;
0368:
0369: try {
0370: // read data from socket
0371: ByteBuffer[] received = readSocket();
0372:
0373: // handle the data
0374: if (received != null) {
0375: getPreviousCallback().onData(received);
0376: }
0377:
0378: // increase preallocated read memory if not sufficient
0379: checkPreallocatedReadMemory();
0380:
0381: } catch (ClosedChannelException ce) {
0382: close(false);
0383:
0384: } catch (Exception t) {
0385: if (LOG.isLoggable(Level.FINE)) {
0386: LOG
0387: .fine("["
0388: + getId()
0389: + "] error occured by handling readable event. reason: "
0390: + t.toString());
0391: }
0392: close(false);
0393:
0394: } catch (Error e) {
0395: close(false);
0396: throw e;
0397: }
0398:
0399: return read;
0400: }
0401:
0402: int onWriteableEvent() throws IOException {
0403: assert (IoSocketDispatcher.isDispatcherThread());
0404:
0405: int sent = 0;
0406:
0407: if (suspendRead) {
0408: if (LOG.isLoggable(Level.FINEST)) {
0409: LOG
0410: .finest("["
0411: + getId()
0412: + "] writeable event occured. update interested to none (because suspendRead is set) and write data to socket");
0413: }
0414: updateInterestedSetNonen();
0415:
0416: } else {
0417: // if (LOG.isLoggable(Level.FINEST)) {
0418: // LOG.finest("[" + getId() + "] writeable event occured. update interested to read and write data to socket");
0419: // }
0420: updateInterestedSetRead();
0421: }
0422:
0423: // write data to socket
0424: sent = writeSocket();
0425:
0426: // all data send? -> check for close
0427: if (sendQueue.isEmpty()) {
0428: if (shouldClosedPhysically()) {
0429: realClose();
0430: }
0431:
0432: // .. no, remaining data to send
0433: } else {
0434: if (LOG.isLoggable(Level.FINE)) {
0435: LOG
0436: .fine("["
0437: + id
0438: + "] remaining data to send. initiate sending of the remaining ("
0439: + DataConverter
0440: .toFormatedBytesSize(sendQueue
0441: .getSize()) + ")");
0442: }
0443:
0444: updateInterestedSetWrite();
0445: }
0446:
0447: if (LOG.isLoggable(Level.FINEST)) {
0448: LOG.finest("[" + getId() + "] writeable event handled");
0449: }
0450:
0451: return sent;
0452: }
0453:
0454: private void blockUntilIsConnected() throws IOException,
0455: SocketTimeoutException {
0456: // check/wait until channel is connected
0457: while (!getChannel().finishConnect()) {
0458: getChannel().configureBlocking(true);
0459: getChannel().finishConnect();
0460: getChannel().configureBlocking(false);
0461: }
0462: }
0463:
0464: private boolean shouldClosedPhysically() {
0465: // close handling (-> close() leads automatically to write, if there is data to write)
0466: if (!isLogicalOpen) {
0467:
0468: // send queue is emtpy -> close can be completed
0469: if (sendQueue.isEmpty()) {
0470: return true;
0471: }
0472: }
0473:
0474: return false;
0475: }
0476:
0477: /**
0478: * {@inheritDoc}
0479: */
0480: public void write(ByteBuffer[] buffers) throws IOException {
0481: if (buffers != null) {
0482: sendQueue.append(buffers);
0483: updateInterestedSetWrite();
0484: }
0485: }
0486:
0487: /**
0488: * {@inheritDoc}
0489: */
0490: @SuppressWarnings("unchecked")
0491: public void close(boolean immediate) throws IOException {
0492: if (immediate || sendQueue.isEmpty()) {
0493: realClose();
0494:
0495: } else {
0496: if (LOG.isLoggable(Level.FINE)) {
0497: LOG
0498: .fine("postpone close until remaning data to write ("
0499: + sendQueue.getSize()
0500: + ") has been written");
0501: }
0502:
0503: isLogicalOpen = false;
0504: updateInterestedSetWrite();
0505: }
0506: }
0507:
0508: private void realClose() {
0509: try {
0510: getDispatcher().deregister(this );
0511: } catch (Exception e) {
0512: if (LOG.isLoggable(Level.FINE)) {
0513: LOG.fine("error occured by deregistering connection "
0514: + id + " on dispatcher. reason: "
0515: + e.toString());
0516: }
0517: }
0518:
0519: try {
0520: channel.close();
0521: if (LOG.isLoggable(Level.FINE)) {
0522: LOG.fine("connection " + id + " has been closed");
0523: }
0524: } catch (Exception e) {
0525: if (LOG.isLoggable(Level.FINE)) {
0526: LOG.fine("error occured by closing connection " + id
0527: + " reason: " + e.toString());
0528: }
0529: }
0530:
0531: if (!isDisconnect) {
0532: isDisconnect = true;
0533: getPreviousCallback().onDisconnect();
0534: }
0535: }
0536:
0537: void onDispatcherClose() {
0538: getPreviousCallback().onConnectionAbnormalTerminated();
0539: }
0540:
0541: private void updateInterestedSetWrite()
0542: throws ClosedChannelException {
0543: try {
0544: dispatcher.updateInterestSet(this , SelectionKey.OP_READ
0545: | SelectionKey.OP_WRITE);
0546: } catch (IOException ioe) {
0547: if (LOG.isLoggable(Level.FINE)) {
0548: LOG
0549: .fine("couldn`t update interested set to write data on socket. Reason: "
0550: + ioe.toString());
0551: }
0552:
0553: try {
0554: dispatcher.deregister(this );
0555: } catch (Exception ignore) {
0556: }
0557:
0558: throw new ClosedChannelException();
0559: }
0560: }
0561:
0562: private void updateInterestedSetRead()
0563: throws ClosedChannelException {
0564: try {
0565: dispatcher.updateInterestSet(this , SelectionKey.OP_READ);
0566: } catch (IOException ioe) {
0567: if (LOG.isLoggable(Level.FINE)) {
0568: LOG
0569: .fine("couldn`t update interested set to read data. Reason: "
0570: + ioe.toString());
0571: }
0572:
0573: try {
0574: dispatcher.deregister(this );
0575: } catch (Exception ignore) {
0576: }
0577:
0578: throw new ClosedChannelException();
0579: }
0580: }
0581:
0582: private void updateInterestedSetNonen()
0583: throws ClosedChannelException {
0584: try {
0585: dispatcher.updateInterestSet(this , 0);
0586: } catch (IOException ioe) {
0587: if (LOG.isLoggable(Level.FINE)) {
0588: LOG
0589: .fine("could not update interested set to nonen. Reason: "
0590: + ioe.toString());
0591: }
0592:
0593: try {
0594: dispatcher.deregister(this );
0595: } catch (Exception ignore) {
0596: }
0597:
0598: throw new ClosedChannelException();
0599: }
0600: }
0601:
0602: /**
0603: * {@inheritDoc}
0604: */
0605: public boolean isOpen() {
0606: return channel.isOpen();
0607: }
0608:
0609: /**
0610: * return the underlying channel
0611: *
0612: * @return the underlying channel
0613: */
0614: public SocketChannel getChannel() {
0615: return channel;
0616: }
0617:
0618: IDispatcher<IoSocketHandler> getDispatcher() {
0619: return dispatcher;
0620: }
0621:
0622: @Override
0623: public void suspendRead() throws IOException {
0624: suspendRead = true;
0625:
0626: // update to write (why?). Reason:
0627: // * avoid race conditions in which current write need will be swallowed
0628: // * write falls back to `none interested set`
0629: updateInterestedSetWrite();
0630: }
0631:
0632: @Override
0633: public void resumeRead() throws IOException {
0634: if (suspendRead) {
0635: suspendRead = false;
0636:
0637: // update to write (why not read?). Reason:
0638: // * avoid race conditions in which current write need will be swallowed
0639: // * write falls back to `read interested set` if there is no data to write
0640: updateInterestedSetWrite();
0641: }
0642: }
0643:
0644: /**
0645: * reads socket into read queue
0646: *
0647: * @return the received data or <code>null</code>
0648: * @throws IOException If some other I/O error occurs
0649: * @throws ClosedChannelException if the underlying channel is closed
0650: */
0651: private ByteBuffer[] readSocket() throws IOException {
0652: assert (IoSocketDispatcher.isDispatcherThread()) : "receiveQueue can only be accessed by the dispatcher thread";
0653:
0654: ByteBuffer[] received = null;
0655:
0656: int read = 0;
0657: lastTimeReceivedMillis = System.currentTimeMillis();
0658:
0659: if (isOpen() && !suspendRead) {
0660:
0661: assert (memoryManager instanceof UnsynchronizedMemoryManager);
0662:
0663: ByteBuffer readBuffer = memoryManager
0664: .acquireMemoryStandardSizeOrPreallocated(soRcvbuf);
0665: int pos = readBuffer.position();
0666: int limit = readBuffer.limit();
0667:
0668: // read from channel
0669: try {
0670: read = channel.read(readBuffer);
0671:
0672: // exception occured while reading
0673: } catch (IOException ioe) {
0674: readBuffer.position(pos);
0675: readBuffer.limit(limit);
0676: memoryManager.recycleMemory(readBuffer);
0677:
0678: if (LOG.isLoggable(Level.FINE)) {
0679: LOG.fine("[" + id
0680: + "] error occured while reading channel: "
0681: + ioe.toString());
0682: }
0683:
0684: throw ioe;
0685: }
0686:
0687: // handle read
0688: switch (read) {
0689:
0690: // end-of-stream has been reached -> throw an exception
0691: case -1:
0692: memoryManager.recycleMemory(readBuffer);
0693: if (LOG.isLoggable(Level.FINE)) {
0694: LOG
0695: .fine("["
0696: + id
0697: + "] channel has reached end-of-stream (maybe closed by peer)");
0698: }
0699: ClosedChannelException cce = new ClosedChannelException();
0700: throw cce;
0701:
0702: // no bytes read recycle read buffer and do nothing
0703: case 0:
0704: memoryManager.recycleMemory(readBuffer);
0705: return null;
0706:
0707: // bytes available (read < -1 is not handled)
0708: default:
0709: int remainingFreeSize = readBuffer.remaining();
0710: ByteBuffer dataBuffer = memoryManager
0711: .extractAndRecycleMemory(readBuffer, read);
0712:
0713: if (received == null) {
0714: received = new ByteBuffer[1];
0715: received[0] = dataBuffer;
0716: }
0717:
0718: receivedBytes += read;
0719:
0720: if (LOG.isLoggable(Level.FINE)) {
0721: LOG.fine("["
0722: + id
0723: + "] received ("
0724: + (dataBuffer.limit() - dataBuffer
0725: .position())
0726: + " bytes, total "
0727: + (receivedBytes + read)
0728: + " bytes): "
0729: + DataConverter.toTextOrHexString(
0730: new ByteBuffer[] { dataBuffer
0731: .duplicate() }, "UTF-8",
0732: MAXSIZE_LOG_READ));
0733: }
0734:
0735: // whole read buffer has been required -> repeat the read, because there could be more data to read
0736: if ((remainingFreeSize == 0) && isRetryRead) {
0737:
0738: // but just i case, if already read size is smaller than the preallocation size
0739: if (read < memoryManager
0740: .gettPreallocationBufferSize()) {
0741: if (LOG.isLoggable(Level.FINE)) {
0742: LOG
0743: .fine("["
0744: + id
0745: + "] complete read buffer has been used, initiating repeated read");
0746: }
0747:
0748: ByteBuffer[] repeatedReceived = readSocket();
0749: if (repeatedReceived != null) {
0750: ByteBuffer[] newReceived = new ByteBuffer[received.length + 1];
0751: newReceived[0] = dataBuffer;
0752: System.arraycopy(repeatedReceived, 0,
0753: newReceived, 1,
0754: repeatedReceived.length);
0755: received = newReceived;
0756:
0757: return received;
0758:
0759: } else {
0760: return received;
0761: }
0762:
0763: } else {
0764: return received;
0765: }
0766: }
0767:
0768: return received;
0769: }
0770:
0771: } else {
0772: if (LOG.isLoggable(Level.FINEST)) {
0773: if (!isOpen()) {
0774: LOG
0775: .finest("["
0776: + getId()
0777: + "] couldn't read socket because socket is already closed");
0778: }
0779:
0780: if (suspendRead) {
0781: LOG.finest("[" + getId()
0782: + "] read is suspended, do nothing");
0783: }
0784: }
0785:
0786: return null;
0787: }
0788: }
0789:
0790: /**
0791: * check if preallocated read buffer size is sufficient. if not increase it
0792: */
0793: private void checkPreallocatedReadMemory() {
0794: assert (IoSocketDispatcher.isDispatcherThread());
0795:
0796: memoryManager.preallocate();
0797: }
0798:
0799: /**
0800: * writes the content of the send queue to the socket
0801: *
0802: * @throws IOException If some other I/O error occurs
0803: * @throws ClosedChannelException if the underlying channel is closed
0804: */
0805: @SuppressWarnings("unchecked")
0806: private int writeSocket() throws IOException {
0807: assert (IoSocketDispatcher.isDispatcherThread());
0808:
0809: int sent = 0;
0810:
0811: ////////////////////////////////////////////////////////////
0812: // Why hasn`t channel.write(ByteBuffer[]) been used??
0813: //
0814: // sendBytes += channel.write(data.toArray(new ByteBuffer[data.size()])) doesn`t
0815: // work correct under WinXP_SP2 & Sun JDK 1.6.0_01-b06 (and other configurations?).
0816: // The channel reports that x bytes have been written, but in some situations duplicated
0817: // data appears on the line (caused by the channel impl?!)
0818: // This behaviour doesn`t appear under Suse9.1/Intel & Sun JDK 1.5.0_08
0819: ////////////////////////////////////////////////////////////
0820:
0821: if (isOpen()) {
0822: ByteBuffer[] buffers = sendQueue.drain();
0823: if (buffers == null) {
0824: return 0;
0825: }
0826:
0827: boolean hasUnwrittenBuffers = false;
0828: try {
0829: for (int i = 0; i < buffers.length; i++) {
0830:
0831: if (buffers[i] != null) {
0832: int writeSize = buffers[i].remaining();
0833:
0834: // data to write for this buffer?
0835: if (writeSize > 0) {
0836: if (LOG.isLoggable(Level.FINE)) {
0837: if (LOG.isLoggable(Level.FINE)) {
0838: LOG
0839: .fine("["
0840: + id
0841: + "] sending ("
0842: + writeSize
0843: + " bytes): "
0844: + DataConverter
0845: .toTextOrHexString(
0846: buffers[i]
0847: .duplicate(),
0848: "UTF-8",
0849: 500));
0850: }
0851: }
0852:
0853: // write to socket (internal out buffer)
0854: try {
0855: int written = channel.write(buffers[i]);
0856: sent += written;
0857: sendBytes += written;
0858:
0859: // all data written?
0860: if (written == writeSize) {
0861: try {
0862: // notify the io handler that data has been written
0863: getPreviousCallback()
0864: .onWritten(buffers[i]);
0865: } catch (Exception e) {
0866: if (LOG.isLoggable(Level.FINE)) {
0867: LOG
0868: .fine("error occured by notifying that buffer has been written "
0869: + e
0870: .toString());
0871: }
0872: }
0873:
0874: buffers[i] = null;
0875:
0876: // ... no, return byte buffer to send queue
0877: } else {
0878: hasUnwrittenBuffers = true; // see finally block
0879:
0880: if (LOG.isLoggable(Level.FINE)) {
0881: LOG
0882: .fine("["
0883: + id
0884: + "] "
0885: + written
0886: + " of "
0887: + (writeSize - written)
0888: + " bytes has been sent ("
0889: + DataConverter
0890: .toFormatedBytesSize((writeSize - written))
0891: + ")");
0892: }
0893: break;
0894: }
0895:
0896: } catch (IOException ioe) {
0897:
0898: if (LOG.isLoggable(Level.FINE)) {
0899: LOG
0900: .fine("error "
0901: + ioe.toString()
0902: + " occured by writing "
0903: + DataConverter
0904: .toTextOrHexString(
0905: buffers[i]
0906: .duplicate(),
0907: "US-ASCII",
0908: 500));
0909: }
0910:
0911: try {
0912: getPreviousCallback()
0913: .onWriteException(ioe,
0914: buffers[i]);
0915: } catch (Exception e) {
0916: if (LOG.isLoggable(Level.FINE)) {
0917: LOG
0918: .fine("error occured by notifying that write exception ("
0919: + e.toString()
0920: + ") has been occured "
0921: + e.toString());
0922: }
0923: }
0924: buffers[i] = null;
0925:
0926: return sent;
0927: }
0928: }
0929: }
0930: }
0931: } finally {
0932:
0933: // not all data written -> return array into (head of) queue
0934: if (hasUnwrittenBuffers) {
0935: sendQueue.addFirst(buffers);
0936: }
0937: }
0938:
0939: } else {
0940: if (LOG.isLoggable(Level.FINEST)) {
0941: if (!isOpen()) {
0942: LOG
0943: .finest("["
0944: + getId()
0945: + "] couldn't write send queue to socket because socket is already closed (sendQueuesize="
0946: + DataConverter
0947: .toFormatedBytesSize(sendQueue
0948: .getSize()) + ")");
0949: }
0950:
0951: if (sendQueue.isEmpty()) {
0952: LOG
0953: .finest("["
0954: + getId()
0955: + "] nothing to write, because send queue is empty ");
0956: }
0957: }
0958: }
0959:
0960: return sent;
0961: }
0962:
0963: /**
0964: * {@inheritDoc}
0965: */
0966: @Override
0967: public final InetAddress getLocalAddress() {
0968: return channel.socket().getLocalAddress();
0969: }
0970:
0971: /**
0972: * {@inheritDoc}
0973: */
0974: @Override
0975: public final int getLocalPort() {
0976: return channel.socket().getLocalPort();
0977: }
0978:
0979: /**
0980: * {@inheritDoc}
0981: */
0982: @Override
0983: public final InetAddress getRemoteAddress() {
0984: return channel.socket().getInetAddress();
0985: }
0986:
0987: /**
0988: * {@inheritDoc}
0989: */
0990: @Override
0991: public final int getRemotePort() {
0992: return channel.socket().getPort();
0993: }
0994:
0995: /**
0996: * {@inheritDoc}
0997: */
0998: public void flushOutgoing() {
0999:
1000: }
1001:
1002: /**
1003: * {@inheritDoc}
1004: */
1005: @Override
1006: public String toString() {
1007: try {
1008: return "("
1009: + channel.socket().getInetAddress().toString()
1010: + ":"
1011: + channel.socket().getPort()
1012: + " -> "
1013: + channel.socket().getLocalAddress().toString()
1014: + ":"
1015: + channel.socket().getLocalPort()
1016: + ")"
1017: + " received="
1018: + DataConverter.toFormatedBytesSize(receivedBytes)
1019: + ", sent="
1020: + DataConverter.toFormatedBytesSize(sendBytes)
1021: + ", age="
1022: + DataConverter.toFormatedDuration(System
1023: .currentTimeMillis()
1024: - openTime)
1025: + ", lastReceived="
1026: + DataConverter
1027: .toFormatedDate(lastTimeReceivedMillis)
1028: + ", sendQueueSize="
1029: + DataConverter.toFormatedBytesSize(sendQueue
1030: .getSize()) + " [" + id + "]";
1031: } catch (Throwable e) {
1032: return super.toString();
1033: }
1034: }
1035: }
|