0001: /*
0002: * This file is part of the QuickServer library
0003: * Copyright (C) QuickServer.org
0004: *
0005: * Use, modification, copying and distribution of this software is subject to
0006: * the terms and conditions of the GNU Lesser General Public License.
0007: * You should have received a copy of the GNU LGP License along with this
0008: * library; if not, you can download a copy from <http://www.quickserver.org/>.
0009: *
0010: * For questions, suggestions, bug-reports, enhancement-requests etc.
0011: * visit http://www.quickserver.org
0012: *
0013: */
0014:
0015: package org.quickserver.net.server.impl;
0016:
0017: import org.quickserver.net.server.*;
0018: import org.quickserver.net.*;
0019: import org.quickserver.util.*;
0020: import org.quickserver.util.io.*;
0021:
0022: import java.io.*;
0023: import java.net.*;
0024: import java.util.*;
0025: import java.util.logging.*;
0026:
0027: import java.nio.*;
0028: import java.nio.channels.*;
0029:
0030: public class NonBlockingClientHandler extends BasicClientHandler {
0031: private static final Logger logger = Logger
0032: .getLogger(NonBlockingClientHandler.class.getName());
0033:
0034: protected ClientWriteHandler clientWriteHandler; //v1.4.5
0035: private SocketChannel socketChannel;
0036:
0037: protected ArrayList readByteBuffer = new ArrayList();
0038: protected ArrayList writeByteBuffer = new ArrayList();
0039:
0040: protected SelectionKey selectionKey;
0041:
0042: protected volatile int threadAccessCount = 0;
0043: protected volatile boolean willReturn;
0044: protected volatile boolean waitingForFinalWrite;
0045:
0046: private static int maxThreadAccessCount = 3; //one for each event ACCEPT, WRITE, READ
0047: private static boolean wakeupSelectorAfterRegisterWrite = true;
0048: private static boolean wakeupSelectorAfterRegisterRead = true;
0049:
0050: /**
0051: * Sets the flag to wakeup Selector After RegisterForWrite is called.
0052: * @since 1.4.7
0053: */
0054: public static void setWakeupSelectorAfterRegisterWrite(boolean flag) {
0055: wakeupSelectorAfterRegisterWrite = flag;
0056: }
0057:
0058: /**
0059: * Returns wakeupSelectorAfterRegisterWrite the flag that controls if wakeup is called on Selector
0060: * after RegisterForWrite is called.
0061: * @since 1.4.7
0062: */
0063: public static boolean getWakeupSelectorAfterRegisterWrite() {
0064: return wakeupSelectorAfterRegisterWrite;
0065: }
0066:
0067: /**
0068: * Sets the flag to wakeup Selector After RegisterForRead is called.
0069: * @since 1.4.7
0070: */
0071: public static void setWakeupSelectorAfterRegisterRead(boolean flag) {
0072: wakeupSelectorAfterRegisterRead = flag;
0073: }
0074:
0075: /**
0076: * Returns wakeupSelectorAfterRegisterRead the flag that controls if wakeup is called on Selector
0077: * after RegisterForRead is called.
0078: * @since 1.4.7
0079: */
0080: public static boolean getWakeupSelectorAfterRegisterRead() {
0081: return wakeupSelectorAfterRegisterRead;
0082: }
0083:
0084: /**
0085: * Sets the maximum count of thread allowed to run objects of this class at a time.
0086: * @since 1.4.7
0087: */
0088: public static void setMaxThreadAccessCount(int count) {
0089: if (count < 3 && count != -1)
0090: throw new IllegalArgumentException(
0091: "Value should be >=3 or -1");
0092: maxThreadAccessCount = count;
0093: }
0094:
0095: /**
0096: * Returns the maximum count of thread allowed to run objects of this class at a time.
0097: * @since 1.4.7
0098: */
0099: public static int getMaxThreadAccessCount() {
0100: return maxThreadAccessCount;
0101: }
0102:
0103: //v1.4.7
0104: private ByteBufferOutputStream byteBufferOutputStream;
0105:
0106: public NonBlockingClientHandler(int instanceCount) {
0107: super (instanceCount);
0108: }
0109:
0110: public NonBlockingClientHandler() {
0111: super ();
0112: }
0113:
0114: public void clean() {
0115: logger.finest("Starting clean - " + getName());
0116: if (threadAccessCount != 0) {
0117: logger.warning("Thread Access Count was not 0!: "
0118: + threadAccessCount);
0119: if (Assertion.isEnabled()) {
0120: assertionSystemExit();
0121: }
0122: threadAccessCount = 0;
0123: }
0124:
0125: while (readByteBuffer.isEmpty() == false) {
0126: try {
0127: getServer().getByteBufferPool().returnObject(
0128: readByteBuffer.remove(0));
0129: } catch (Exception er) {
0130: appLogger
0131: .warning("Error in returning read ByteBuffer to pool: "
0132: + er);
0133: break;
0134: }
0135: }
0136:
0137: while (writeByteBuffer.isEmpty() == false) {
0138: try {
0139: getServer().getByteBufferPool().returnObject(
0140: writeByteBuffer.remove(0));
0141: } catch (Exception er) {
0142: appLogger
0143: .warning("Error in returning write ByteBuffer to pool: "
0144: + er);
0145: break;
0146: }
0147: }
0148:
0149: if (selectionKey != null) {
0150: selectionKey.cancel();
0151: selectionKey.selector().wakeup();
0152: selectionKey = null;
0153: }
0154: willReturn = false;
0155: waitingForFinalWrite = false;
0156: socketChannel = null;
0157: if (byteBufferOutputStream != null) {
0158: byteBufferOutputStream.close();
0159: }
0160:
0161: super .clean();
0162:
0163: clientWriteHandler = null;//1.4.5
0164: byteBufferOutputStream = null;
0165:
0166: logger.finest("Finished clean - " + getName());
0167: }
0168:
0169: protected void finalize() throws Throwable {
0170: clean();
0171: super .finalize();
0172: }
0173:
0174: public void handleClient(TheClient theClient) {
0175: super .handleClient(theClient);
0176: setClientWriteHandler(theClient.getClientWriteHandler()); //v1.4.5
0177: setSocketChannel(theClient.getSocketChannel());//1.4.5
0178: }
0179:
0180: protected void setInputStream(InputStream in) throws IOException {
0181: this .in = in;
0182: if (getDataMode(DataType.IN) == DataMode.STRING) {
0183: b_in = null;
0184: o_in = null;
0185: bufferedReader = null;
0186: } else if (getDataMode(DataType.IN) == DataMode.OBJECT) {
0187: b_in = null;
0188: bufferedReader = null;
0189: o_in = new ObjectInputStream(in);
0190: } else if (getDataMode(DataType.IN) == DataMode.BYTE
0191: || getDataMode(DataType.IN) == DataMode.BINARY) {
0192: o_in = null;
0193: bufferedReader = null;
0194: b_in = null;
0195: }
0196: }
0197:
0198: public BufferedReader getBufferedReader() {
0199: throw new IllegalStateException(
0200: "Access to BufferedReader in not allowed in Non-Blocking mode!");
0201: }
0202:
0203: public void closeConnection() {
0204: synchronized (this ) {
0205: if (connection == false)
0206: return;
0207: if (waitingForFinalWrite)
0208: return;
0209: if (getSelectionKey() != null
0210: && getSelectionKey().isValid() && lost == false) {
0211: waitingForFinalWrite = true;
0212: } else {
0213: connection = false;
0214: }
0215: }
0216:
0217: try {
0218: if (getSocketChannel() != null && socket != null) {
0219: if (waitingForFinalWrite) {
0220: try {
0221: waitTillFullyWritten();
0222: } catch (Exception error) {
0223: logger
0224: .warning("Error in waitingForFinalWrite : "
0225: + error);
0226: if (logger.isLoggable(Level.FINE)) {
0227: logger.fine("StackTrace:\n"
0228: + MyString.getStackTrace(error));
0229: }
0230: } finally {
0231: connection = false;
0232: byteBufferOutputStream.forceNotify();
0233: getSelectionKey().cancel();
0234: }
0235: }//end of waitingForFinalWrite
0236:
0237: synchronized (this ) {
0238: if (hasEvent(ClientEvent.MAX_CON) == false) {
0239: notifyCloseOrLost();
0240: }
0241: if (getSocketChannel().isOpen()) {
0242: logger.finest("Closing SocketChannel");
0243: getSocketChannel().close();
0244: }
0245: }
0246: }
0247: if (getServer() != null) {
0248: getServer().getSelector().wakeup();
0249: }
0250: } catch (IOException e) {
0251: logger.warning("Error in closeConnection : " + e);
0252: if (logger.isLoggable(Level.FINE)) {
0253: logger
0254: .fine("StackTrace:\n"
0255: + MyString.getStackTrace(e));
0256: }
0257: } catch (NullPointerException npe) {
0258: logger.fine("NullPointerException: " + npe);
0259: if (logger.isLoggable(Level.FINE)) {
0260: logger.fine("StackTrace:\n"
0261: + MyString.getStackTrace(npe));
0262: }
0263: }
0264: }
0265:
0266: /**
0267: * waitTillFullyWritten
0268: * @since 1.4.7
0269: */
0270: public void waitTillFullyWritten() {
0271: Object waitLock = new Object();
0272: if (byteBufferOutputStream.isDataAvailableForWrite(waitLock)) {
0273: if (ByteBufferOutputStream.isLoggable(Level.FINEST)) {
0274: logger.finest("Waiting " + getName());
0275: }
0276: try {
0277: synchronized (waitLock) {
0278: waitLock.wait(1000 * 60 * 2);//2 min max
0279: }
0280: } catch (InterruptedException ie) {
0281: logger.warning("Error: " + ie);
0282: }
0283: if (ByteBufferOutputStream.isLoggable(Level.FINEST)) {
0284: logger.finest("Done. " + getName());
0285: }
0286: }
0287: }
0288:
0289: public void run() {
0290: if (unprocessedClientEvents.size() == 0) {
0291: logger.finest("No unprocessed ClientEvents!");
0292: return;
0293: }
0294:
0295: synchronized (this ) {
0296: if (willReturn) {
0297: return;
0298: } else {
0299: threadAccessCount++;
0300: }
0301: }
0302:
0303: ClientEvent currentEvent = (ClientEvent) unprocessedClientEvents
0304: .remove(0);
0305:
0306: if (logger.isLoggable(Level.FINEST)) {
0307: StringBuffer sb = new StringBuffer();
0308: sb.append("Running ").append(getName());
0309: sb.append(" using ");
0310: sb.append(Thread.currentThread().getName());
0311: sb.append(" for ");
0312:
0313: synchronized (clientEvents) {
0314: if (clientEvents.size() > 1) {
0315: sb.append(currentEvent + ", Current Events - "
0316: + clientEvents);
0317: } else {
0318: sb.append(currentEvent);
0319: }
0320: }
0321: logger.finest(sb.toString());
0322: }
0323:
0324: if (currentEvent == null) {
0325: threadEvent.set(null);
0326: return;
0327: } else {
0328: threadEvent.set(currentEvent);
0329: }
0330:
0331: try {
0332: if (maxThreadAccessCount != -1
0333: && threadAccessCount > maxThreadAccessCount) {
0334: logger.warning("ThreadAccessCount can't go beyond "
0335: + maxThreadAccessCount + ": "
0336: + threadAccessCount);
0337: if (Assertion.isEnabled()) {
0338: throw new AssertionError(
0339: "ThreadAccessCount can't go beyond "
0340: + maxThreadAccessCount + ": "
0341: + threadAccessCount);
0342: }
0343: return;
0344: }
0345:
0346: if (socket == null)
0347: throw new SocketException("Socket was null!");
0348:
0349: if (getThreadEvent() == ClientEvent.ACCEPT
0350: || getThreadEvent() == ClientEvent.MAX_CON) {
0351: prepareForRun();
0352: Assertion.affirm(willReturn == false,
0353: "WillReturn has to be false!: " + willReturn);
0354: }
0355:
0356: if (getThreadEvent() == ClientEvent.MAX_CON) {
0357: processMaxConnection(currentEvent);
0358: }
0359:
0360: try {
0361: if (getThreadEvent() == ClientEvent.ACCEPT) {
0362: registerForRead();
0363: clientEventHandler.gotConnected(this );
0364:
0365: if (authorised == false) {
0366: if (clientAuthenticationHandler == null
0367: && authenticator == null) {
0368: authorised = true;
0369: logger.finest("No Authenticator "
0370: + getName() + " so return thread.");
0371: } else {
0372: if (clientAuthenticationHandler != null) {
0373: AuthStatus authStatus = null;
0374: do {
0375: authStatus = processAuthorisation();
0376: } while (authStatus == AuthStatus.FAILURE);
0377:
0378: if (authStatus == AuthStatus.SUCCESS)
0379: authorised = true;
0380: } else {
0381: processAuthorisation();
0382: }
0383: if (authorised)
0384: logger.finest("Authentication done "
0385: + getName()
0386: + ", so return thread.");
0387: else
0388: logger
0389: .finest("askAuthentication() done "
0390: + getName()
0391: + ", so return thread.");
0392: }
0393: }//end authorised
0394: returnThread(); //return thread to pool
0395: return;
0396: }
0397:
0398: if (connection && getThreadEvent() == ClientEvent.READ) {
0399: if (processRead())
0400: return;
0401: }
0402:
0403: if (connection && getThreadEvent() == ClientEvent.WRITE) {
0404: if (processWrite())
0405: return;
0406: }
0407:
0408: } catch (SocketException e) {
0409: appLogger.finest("SocketException - Client ["
0410: + getHostAddress() + "]: " + e.getMessage());
0411: //e.printStackTrace();
0412: lost = true;
0413: } catch (AppException e) {
0414: //errors from Application
0415: appLogger.finest("AppException "
0416: + Thread.currentThread().getName() + ": "
0417: + e.getMessage());
0418: } catch (javax.net.ssl.SSLException e) {
0419: lost = true;
0420: if (Assertion.isEnabled()) {
0421: appLogger.info("SSLException - Client ["
0422: + getHostAddress() + "] "
0423: + Thread.currentThread().getName() + ": "
0424: + e);
0425: } else {
0426: appLogger.warning("SSLException - Client ["
0427: + getHostAddress() + "]: " + e);
0428: }
0429: } catch (ConnectionLostException e) {
0430: lost = true;
0431: if (e.getMessage() != null)
0432: appLogger.finest("Connection lost "
0433: + Thread.currentThread().getName() + ": "
0434: + e.getMessage());
0435: else
0436: appLogger.finest("Connection lost "
0437: + Thread.currentThread().getName());
0438: } catch (ClosedChannelException e) {
0439: lost = true;
0440: appLogger.finest("Channel closed "
0441: + Thread.currentThread().getName() + ": " + e);
0442: } catch (IOException e) {
0443: lost = true;
0444: appLogger.fine("IOError "
0445: + Thread.currentThread().getName() + ": " + e);
0446: } catch (AssertionError er) {
0447: logger.warning("[AssertionError] " + getName() + " "
0448: + er);
0449: if (logger.isLoggable(Level.FINEST)) {
0450: logger.finest("StackTrace "
0451: + Thread.currentThread().getName() + ": "
0452: + MyString.getStackTrace(er));
0453: }
0454: assertionSystemExit();
0455: } catch (Error er) {
0456: logger.warning("[Error] " + er);
0457: if (logger.isLoggable(Level.FINEST)) {
0458: logger.finest("StackTrace "
0459: + Thread.currentThread().getName() + ": "
0460: + MyString.getStackTrace(er));
0461: }
0462: if (Assertion.isEnabled()) {
0463: assertionSystemExit();
0464: }
0465: lost = true;
0466: } catch (RuntimeException re) {
0467: logger.warning("[RuntimeException] "
0468: + MyString.getStackTrace(re));
0469: if (Assertion.isEnabled()) {
0470: assertionSystemExit();
0471: }
0472: lost = true;
0473: }
0474:
0475: if (getThreadEvent() != ClientEvent.MAX_CON) {
0476: notifyCloseOrLost();
0477: }
0478:
0479: if (connection) {
0480: logger.finest(Thread.currentThread().getName()
0481: + " calling closeConnection()");
0482: closeConnection();
0483: }
0484:
0485: if (connection == true && lost == true
0486: && waitingForFinalWrite) {
0487: byteBufferOutputStream.forceNotify();
0488: }
0489: } catch (javax.net.ssl.SSLException se) {
0490: logger.warning("SSLException "
0491: + Thread.currentThread().getName() + " - " + se);
0492: } catch (IOException ie) {
0493: logger.warning("IOError "
0494: + Thread.currentThread().getName()
0495: + " - Closing Client : " + ie);
0496: } catch (RuntimeException re) {
0497: logger.warning("[RuntimeException] " + getName() + " "
0498: + Thread.currentThread().getName() + " - "
0499: + MyString.getStackTrace(re));
0500: if (Assertion.isEnabled()) {
0501: assertionSystemExit();
0502: }
0503: } catch (Exception e) {
0504: logger.warning("Error " + Thread.currentThread().getName()
0505: + " - Event:" + getThreadEvent() + " - Socket:"
0506: + socket + " : " + e);
0507: logger.fine("StackTrace: " + getName() + "\n"
0508: + MyString.getStackTrace(e));
0509: if (Assertion.isEnabled()) {
0510: assertionSystemExit();
0511: }
0512: } catch (Error e) {
0513: logger.warning("Error " + Thread.currentThread().getName()
0514: + " - Event:" + getThreadEvent() + " - Socket:"
0515: + socket + " : " + e);
0516: logger.fine("StackTrace: " + getName() + "\n"
0517: + MyString.getStackTrace(e));
0518: if (Assertion.isEnabled()) {
0519: assertionSystemExit();
0520: }
0521: }
0522:
0523: synchronized (this ) {
0524: try {
0525: if (getSelectionKey() != null
0526: && getSelectionKey().isValid()) {
0527: logger.finest("Canceling SelectionKey");
0528: getSelectionKey().cancel();
0529: }
0530:
0531: if (socket != null && socket.isClosed() == false) {
0532: logger.finest("Closing Socket");
0533: socket.close();
0534: }
0535:
0536: if (getSocketChannel() != null
0537: && getSocketChannel().isOpen()) {
0538: logger.finest("Closing SocketChannel");
0539: socketChannel.close();
0540: }
0541: } catch (Exception re) {
0542: logger.warning("Error closing Socket/Channel: " + re);
0543: }
0544: }//end synchronized
0545:
0546: willClean = true;
0547: returnClientData();
0548:
0549: boolean returnClientHandler = false;
0550: synchronized (lockObj) {
0551: returnThread();
0552: returnClientHandler = checkReturnClientHandler();
0553: }
0554:
0555: if (returnClientHandler) {
0556: returnClientHandler(); //return to pool
0557: }
0558: }
0559:
0560: protected boolean checkReturnClientHandler() {
0561: if (willReturn == false) {
0562: willReturn = true;
0563: return true;
0564: }
0565: return false;
0566: }
0567:
0568: /**
0569: * Process read
0570: * @return value indicates if the thread should return form run()
0571: */
0572: private boolean processRead() throws Exception, AppException {
0573: int count = 0;
0574: int fullCount = 0;
0575: ByteBuffer buffer = (ByteBuffer) getServer()
0576: .getByteBufferPool().borrowObject();
0577:
0578: while (true) {
0579: try {
0580: count = getSocketChannel().read(buffer);
0581: if (count <= 0) {
0582: //logger.finest("SocketChannel read was "+count+"!");
0583: getServer().getByteBufferPool()
0584: .returnObject(buffer);
0585: buffer = null;
0586: break;
0587: } else {
0588: fullCount += count;
0589: }
0590:
0591: buffer.flip(); // Make readable
0592: readByteBuffer.add(buffer);
0593:
0594: buffer = (ByteBuffer) getServer().getByteBufferPool()
0595: .borrowObject();
0596: } catch (Exception error) {
0597: logger.finest("Error in data read: " + error);
0598: lost = true;
0599: synchronized (getInputStream()) {
0600: getInputStream().notifyAll();
0601: }
0602: throw error;
0603: } finally {
0604: if (buffer != null && count <= 0) {
0605: getServer().getByteBufferPool()
0606: .returnObject(buffer);
0607: buffer = null;
0608: }
0609: }
0610: }//end while
0611:
0612: if (count < 0) {
0613: logger.finest("SocketChannel read was " + count + "!");
0614: lost = true;
0615: synchronized (getInputStream()) {
0616: getInputStream().notifyAll();
0617: }
0618: } else {
0619: logger.finest(fullCount + " bytes read");
0620: if (fullCount != 0) {
0621: updateLastCommunicationTime();
0622: synchronized (getInputStream()) {
0623: getInputStream().notify(); //if any are waiting
0624: }
0625: if (hasEvent(ClientEvent.ACCEPT) == false) {
0626: processGotDataInBuffers();
0627: }
0628: }
0629:
0630: //check if any data was read but not yet processed
0631: while (getInputStream().available() > 0) {
0632: logger.finest("Sending again for processing...");
0633: if (hasEvent(ClientEvent.ACCEPT) == false) {
0634: processGotDataInBuffers();
0635: break;
0636: } else {
0637: synchronized (getInputStream()) {
0638: getInputStream().notifyAll();
0639: }
0640: Thread.sleep(100);
0641: }
0642: }
0643:
0644: if (connection) {
0645: registerForRead();
0646: //getSelectionKey().selector().wakeup();
0647: returnThread(); //return to pool
0648: return true;
0649: }
0650: }//end of else
0651: logger
0652: .finest("We don't have connection, lets return all resources.");
0653: return false;
0654: }
0655:
0656: /**
0657: * Process write
0658: * @return value indicates if the thread should return form run()
0659: */
0660: private boolean processWrite() throws IOException {
0661: updateLastCommunicationTime();
0662:
0663: boolean flag = byteBufferOutputStream.writeAllByteBuffer();
0664:
0665: if (flag == false) {
0666: registerWrite();
0667: } else if (/*flag==true && */clientWriteHandler != null) {
0668: clientWriteHandler.handleWrite(this );
0669: }
0670:
0671: if (connection) {
0672: returnThread(); //return to pool
0673: return true;
0674: } else {
0675: logger
0676: .finest("We don't have connection, lets return all resources.");
0677: }
0678: return false;
0679: }
0680:
0681: protected void returnThread() {
0682: threadAccessCount--;
0683: Assertion.affirm(threadAccessCount >= 0,
0684: "ThreadAccessCount went less the 0! Value: "
0685: + threadAccessCount);
0686: //return is done at ClientThread end
0687: removeEvent((ClientEvent) threadEvent.get());
0688: }
0689:
0690: protected void returnClientHandler() {
0691: logger.finest(getName());
0692: try {
0693: for (int i = 0; threadAccessCount != 0; i++) {
0694: if (i == 100) {
0695: logger
0696: .warning("ClientHandler must have got into a loop waiting for thread to free up! ThreadAccessCount="
0697: + threadAccessCount);
0698: threadAccessCount = 0;
0699: if (Assertion.isEnabled()) {
0700: assertionSystemExit();
0701: } else {
0702: break;
0703: }
0704: }
0705: if (threadAccessCount <= 0)
0706: break;
0707:
0708: logger.finest("Waiting for other thread of "
0709: + getName() + " to finish");
0710: Thread.sleep(60);
0711: }
0712: } catch (InterruptedException ie) {
0713: appLogger.warning("InterruptedException: " + ie);
0714: }
0715: super .returnClientHandler();
0716: }
0717:
0718: public void setDataMode(DataMode dataMode, DataType dataType)
0719: throws IOException {
0720: if (getDataMode(dataType) == dataMode)
0721: return;
0722:
0723: appLogger.fine("Setting Type:" + dataType + ", Mode:"
0724: + dataMode);
0725: super .checkDataModeSet(dataMode, dataType);
0726:
0727: setDataModeNonBlocking(dataMode, dataType);
0728: }
0729:
0730: private void setDataModeNonBlocking(DataMode dataMode,
0731: DataType dataType) throws IOException {
0732: logger.finest("ENTER");
0733: if (dataMode == DataMode.STRING) {
0734: if (dataType == DataType.OUT) {
0735: if (dataModeOUT == DataMode.BYTE
0736: || dataModeOUT == DataMode.BINARY) {
0737: dataModeOUT = dataMode;
0738: } else if (dataModeOUT == DataMode.OBJECT) {
0739: dataModeOUT = dataMode;
0740: o_out.flush();
0741: o_out = null;
0742: b_out = new BufferedOutputStream(out);
0743: } else {
0744: Assertion.affirm(false,
0745: "Unknown DataType.OUT DataMode - "
0746: + dataModeOUT);
0747: }
0748: Assertion.affirm(b_out != null,
0749: "BufferedOutputStream is still null!");
0750: Assertion.affirm(o_out == null,
0751: "ObjectOutputStream is still not null!");
0752: } else if (dataType == DataType.IN) {
0753: dataModeIN = dataMode;
0754:
0755: if (o_in != null) {
0756: if (o_in.available() != 0)
0757: logger
0758: .warning("Data looks to be present in ObjectInputStream");
0759: o_in = null;
0760: }
0761: b_in = null;
0762: bufferedReader = null;
0763: //input stream will work
0764: Assertion.affirm(in != null,
0765: "InputStream is still null!");
0766: Assertion.affirm(b_in == null,
0767: "BufferedInputStream is still not null!");
0768: Assertion.affirm(bufferedReader == null,
0769: "BufferedReader is still not null!");
0770: }
0771: } else if (dataMode == DataMode.OBJECT) {
0772: if (dataType == DataType.IN) {
0773: //we will disable this for now
0774: throw new IllegalArgumentException(
0775: "Can't set DataType.IN mode to OBJECT when blocking mode is set as false!");
0776: }
0777:
0778: if (dataType == DataType.OUT) {
0779: dataModeOUT = dataMode;
0780: b_out = null;
0781: o_out = new ObjectOutputStream(out);
0782: Assertion.affirm(o_out != null,
0783: "ObjectOutputStream is still null!");
0784: } /*else if(dataType == DataType.IN) {
0785: dataModeIN = dataMode;
0786: b_in = null;
0787: bufferedReader = null;
0788: //registerForRead();
0789: o_in = new ObjectInputStream(in); //will block
0790: Assertion.affirm(o_in!=null, "ObjectInputStream is still null!");
0791: }*/
0792: } else if (dataMode == DataMode.BYTE
0793: || dataMode == DataMode.BINARY) {
0794: if (dataType == DataType.OUT) {
0795: if (dataModeOUT == DataMode.STRING
0796: || dataModeOUT == DataMode.BYTE
0797: || dataModeOUT == DataMode.BINARY) {
0798: dataModeOUT = dataMode;
0799: } else if (dataModeOUT == DataMode.OBJECT) {
0800: dataModeOUT = dataMode;
0801:
0802: o_out = null;
0803: b_out = new BufferedOutputStream(out);
0804: } else {
0805: Assertion.affirm(false,
0806: "Unknown DataType.OUT - DataMode: "
0807: + dataModeOUT);
0808: }
0809: Assertion.affirm(b_out != null,
0810: "BufferedOutputStream is still null!");
0811: } else if (dataType == DataType.IN) {
0812: dataModeIN = dataMode;
0813: o_in = null;
0814: bufferedReader = null;
0815: b_in = null;
0816: //input stream will work
0817: Assertion.affirm(in != null,
0818: "InputStream is still null!");
0819: } else {
0820: throw new IllegalArgumentException(
0821: "Unknown DataType : " + dataType);
0822: }
0823: } else {
0824: throw new IllegalArgumentException("Unknown DataMode : "
0825: + dataMode);
0826: }
0827: }
0828:
0829: protected byte[] readInputStream() throws IOException {
0830: return readInputStream(getInputStream());
0831: }
0832:
0833: public void updateInputOutputStreams() throws IOException {
0834: byteBufferOutputStream = new ByteBufferOutputStream(
0835: writeByteBuffer, this );
0836: setInputStream(new ByteBufferInputStream(readByteBuffer, this ,
0837: getCharset()));
0838: setOutputStream(byteBufferOutputStream);
0839: }
0840:
0841: public void setSocketChannel(SocketChannel socketChannel) {
0842: this .socketChannel = socketChannel;
0843: }
0844:
0845: public SocketChannel getSocketChannel() {
0846: return socketChannel;
0847: }
0848:
0849: public void setSelectionKey(SelectionKey selectionKey) {
0850: this .selectionKey = selectionKey;
0851: }
0852:
0853: public SelectionKey getSelectionKey() {
0854: if (selectionKey == null)
0855: selectionKey = getSocketChannel().keyFor(
0856: getServer().getSelector());
0857: return selectionKey;
0858: }
0859:
0860: private void processGotDataInBuffers() throws AppException,
0861: ConnectionLostException, ClassNotFoundException,
0862: IOException {
0863: if (getInputStream().available() == 0)
0864: return;
0865:
0866: logger.finest("Trying to process got data.. DataMode.IN="
0867: + dataModeIN);
0868: AuthStatus authStatus = null;
0869:
0870: //--For debug
0871: //((ByteBufferInputStream) getInputStream()).dumpContent();
0872:
0873: String temp = null;
0874: String rec = null;
0875: Object recObject = null;
0876: byte[] recByte = null;
0877:
0878: boolean timeToCheckForNewLineMiss = false;
0879:
0880: do {
0881: //updateLastCommunicationTime();
0882:
0883: if (dataModeIN == DataMode.STRING) {
0884: ByteBufferInputStream bbin = (ByteBufferInputStream) getInputStream();
0885: timeToCheckForNewLineMiss = true;
0886:
0887: while (bbin.isLineReady()) {
0888:
0889: rec = bbin.readLine();
0890: if (rec == null) {
0891: lost = true;
0892: return;
0893: }
0894: if (getCommunicationLogging() && authorised == true) {
0895: appLogger.fine("Got STRING ["
0896: + getHostAddress() + "] : " + rec);
0897: }
0898:
0899: if (authorised == false)
0900: authStatus = clientAuthenticationHandler
0901: .handleAuthentication(this , rec);
0902: else
0903: clientCommandHandler.handleCommand(this , rec);
0904:
0905: if (isClosed() == true)
0906: return;
0907:
0908: while (authStatus == AuthStatus.FAILURE)
0909: authStatus = processAuthorisation();
0910:
0911: if (authStatus == AuthStatus.SUCCESS)
0912: authorised = true;
0913:
0914: if (dataModeIN != DataMode.STRING) {
0915: break;
0916: }
0917:
0918: timeToCheckForNewLineMiss = false;
0919: }//end of while
0920:
0921: if (timeToCheckForNewLineMiss
0922: && bbin.availableOnlyInByteBuffer() == 0) {
0923: return;
0924: } else {
0925: timeToCheckForNewLineMiss = false;
0926: }
0927: }
0928:
0929: //} else if(dataModeIN == DataMode.OBJECT) {
0930: /*
0931: while(dataModeIN == DataMode.OBJECT && o_in!=null) {
0932: recObject = o_in.readObject();
0933: if(recObject==null) {
0934: lost = true;
0935: return;
0936: }
0937: if(getCommunicationLogging() && authorised == true) {
0938: appLogger.fine("Got OBJECT ["+getHostAddress()+"] : "+
0939: recObject.toString());
0940: }
0941:
0942:
0943: if(authorised == false)
0944: authStatus = clientAuthenticationHandler.handleAuthentication(this, recObject);
0945: else
0946: clientObjectHandler.handleObject(this, recObject);
0947:
0948: if(isClosed()==true) return;
0949:
0950: while(authStatus==AuthStatus.FAILURE)
0951: authStatus = processAuthorisation();
0952:
0953: if(authStatus==AuthStatus.SUCCESS)
0954: authorised = true;
0955: }
0956: */
0957:
0958: //} else if(dataModeIN == DataMode.BYTE) {
0959: while (dataModeIN == DataMode.BYTE
0960: && getInputStream().available() != 0) {
0961: rec = readBytes();
0962: if (rec == null) {
0963: lost = true;
0964: return;
0965: }
0966: if (getCommunicationLogging() && authorised == true) {
0967: appLogger.fine("Got BYTE [" + getHostAddress()
0968: + "] : " + rec);
0969: }
0970:
0971: if (authorised == false)
0972: authStatus = clientAuthenticationHandler
0973: .handleAuthentication(this , rec);
0974: else
0975: clientCommandHandler.handleCommand(this , rec);
0976:
0977: if (isClosed() == true)
0978: return;
0979:
0980: while (authStatus == AuthStatus.FAILURE)
0981: authStatus = processAuthorisation();
0982:
0983: if (authStatus == AuthStatus.SUCCESS)
0984: authorised = true;
0985: }
0986:
0987: //} else if(dataModeIN == DataMode.BINARY) {
0988: while (dataModeIN == DataMode.BINARY
0989: && getInputStream().available() != 0) {
0990: recByte = readBinary();
0991: if (recByte == null) {
0992: lost = true;
0993: return;
0994: }
0995: if (getCommunicationLogging() && authorised == true) {
0996: appLogger.fine("Got BINARY [" + getHostAddress()
0997: + "] : "
0998: + MyString.getMemInfo(recByte.length));
0999: }
1000:
1001: if (authorised == false)
1002: authStatus = clientAuthenticationHandler
1003: .handleAuthentication(this , recByte);
1004: else
1005: clientBinaryHandler.handleBinary(this , recByte);
1006:
1007: if (isClosed() == true)
1008: return;
1009:
1010: while (authStatus == AuthStatus.FAILURE)
1011: authStatus = processAuthorisation();
1012:
1013: if (authStatus == AuthStatus.SUCCESS)
1014: authorised = true;
1015: }
1016:
1017: //} else {
1018: if (dataModeIN != DataMode.STRING
1019: && dataModeIN != DataMode.OBJECT
1020: && dataModeIN != DataMode.BYTE
1021: && dataModeIN != DataMode.BINARY) {
1022: throw new IllegalStateException(
1023: "Incoming DataMode is not supported : "
1024: + dataModeIN);
1025: }
1026: } while (getInputStream().available() != 0);
1027: }
1028:
1029: public void registerForRead() throws IOException,
1030: ClosedChannelException {
1031: try {
1032: if (getSelectionKey() == null) {
1033: boolean flag = getServer().registerChannel(
1034: getSocketChannel(), SelectionKey.OP_READ, this );
1035: if (flag) {
1036: logger.finest("Adding OP_READ as interest Ops for "
1037: + getName());
1038: } else if (ByteBufferOutputStream
1039: .isLoggable(Level.FINEST)) {
1040: logger
1041: .finest("OP_READ is already present in interest Ops for "
1042: + getName());
1043: }
1044: } else if (getSelectionKey().isValid()) {
1045: if ((getSelectionKey().interestOps() & SelectionKey.OP_READ) == 0) {
1046: logger.finest("Adding OP_READ to interest Ops for "
1047: + getName());
1048: removeEvent(ClientEvent.READ);
1049: getSelectionKey().interestOps(
1050: getSelectionKey().interestOps()
1051: | SelectionKey.OP_READ);
1052: if (wakeupSelectorAfterRegisterRead) {
1053: getServer().getSelector().wakeup();
1054: }
1055: } else {
1056: if (ByteBufferOutputStream.isLoggable(Level.FINEST)) {
1057: logger
1058: .finest("OP_READ is already present in interest Ops for "
1059: + getName());
1060: }
1061: }
1062: } else {
1063: throw new IOException("SelectionKey is invalid!");
1064: }
1065: } catch (CancelledKeyException e) {
1066: throw new IOException("SelectionKey is cancelled!");
1067: }
1068: }
1069:
1070: public void registerForWrite() throws IOException,
1071: ClosedChannelException {
1072: if (hasEvent(ClientEvent.RUN_BLOCKING)
1073: || hasEvent(ClientEvent.MAX_CON_BLOCKING)) {
1074: throw new IllegalStateException(
1075: "This method is only allowed under Non-Blocking mode.");
1076: }
1077:
1078: if (clientWriteHandler == null) {
1079: throw new IllegalStateException(
1080: "ClientWriteHandler has not been set!");
1081: }
1082: registerWrite();
1083: }
1084:
1085: public void registerWrite() throws IOException {
1086: try {
1087: if (getSelectionKey() == null) {
1088: boolean flag = getServer()
1089: .registerChannel(getSocketChannel(),
1090: SelectionKey.OP_WRITE, this );
1091: if (flag) {
1092: logger
1093: .finest("Adding OP_WRITE as interest Ops for "
1094: + getName());
1095: } else if (ByteBufferOutputStream
1096: .isLoggable(Level.FINEST)) {
1097: logger
1098: .finest("OP_WRITE is already present in interest Ops for "
1099: + getName());
1100: }
1101: } else if (getSelectionKey().isValid()) {
1102: if ((getSelectionKey().interestOps() & SelectionKey.OP_WRITE) == 0) {
1103: logger
1104: .finest("Adding OP_WRITE to interest Ops for "
1105: + getName());
1106: removeEvent(ClientEvent.WRITE);
1107: getSelectionKey().interestOps(
1108: getSelectionKey().interestOps()
1109: | SelectionKey.OP_WRITE);
1110: if (wakeupSelectorAfterRegisterWrite) {
1111: getServer().getSelector().wakeup();
1112: }
1113: } else {
1114: if (ByteBufferOutputStream.isLoggable(Level.FINEST)) {
1115: logger
1116: .finest("OP_WRITE is already present in interest Ops for "
1117: + getName());
1118: }
1119: }
1120: } else {
1121: throw new IOException("SelectionKey is invalid!");
1122: }
1123: } catch (CancelledKeyException e) {
1124: throw new IOException("SelectionKey is cancelled!");
1125: }
1126: }
1127:
1128: protected void setClientWriteHandler(ClientWriteHandler handler) {
1129: clientWriteHandler = handler;
1130: }
1131:
1132: /**
1133: * Returns number of thread currently in this object.
1134: * @since 1.4.6
1135: */
1136: public int getThreadAccessCount() {
1137: return threadAccessCount;
1138: }
1139: }
|