Source Code Cross Referenced for NonBlockingClientHandler.java in  » Net » QuickServer » org » quickserver » net » server » impl » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Net » QuickServer » org.quickserver.net.server.impl 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        /*
0002:         * This file is part of the QuickServer library 
0003:         * Copyright (C) QuickServer.org
0004:         *
0005:         * Use, modification, copying and distribution of this software is subject to
0006:         * the terms and conditions of the GNU Lesser General Public License. 
0007:         * You should have received a copy of the GNU LGP License along with this 
0008:         * library; if not, you can download a copy from <http://www.quickserver.org/>.
0009:         *
0010:         * For questions, suggestions, bug-reports, enhancement-requests etc.
0011:         * visit http://www.quickserver.org
0012:         *
0013:         */
0014:
0015:        package org.quickserver.net.server.impl;
0016:
0017:        import org.quickserver.net.server.*;
0018:        import org.quickserver.net.*;
0019:        import org.quickserver.util.*;
0020:        import org.quickserver.util.io.*;
0021:
0022:        import java.io.*;
0023:        import java.net.*;
0024:        import java.util.*;
0025:        import java.util.logging.*;
0026:
0027:        import java.nio.*;
0028:        import java.nio.channels.*;
0029:
0030:        public class NonBlockingClientHandler extends BasicClientHandler {
0031:            private static final Logger logger = Logger
0032:                    .getLogger(NonBlockingClientHandler.class.getName());
0033:
0034:            protected ClientWriteHandler clientWriteHandler; //v1.4.5	
0035:            private SocketChannel socketChannel;
0036:
0037:            protected ArrayList readByteBuffer = new ArrayList();
0038:            protected ArrayList writeByteBuffer = new ArrayList();
0039:
0040:            protected SelectionKey selectionKey;
0041:
0042:            protected volatile int threadAccessCount = 0;
0043:            protected volatile boolean willReturn;
0044:            protected volatile boolean waitingForFinalWrite;
0045:
0046:            private static int maxThreadAccessCount = 3; //one for each event ACCEPT, WRITE, READ
0047:            private static boolean wakeupSelectorAfterRegisterWrite = true;
0048:            private static boolean wakeupSelectorAfterRegisterRead = true;
0049:
0050:            /**
0051:             * Sets the flag to wakeup Selector After RegisterForWrite is called.
0052:             * @since 1.4.7
0053:             */
0054:            public static void setWakeupSelectorAfterRegisterWrite(boolean flag) {
0055:                wakeupSelectorAfterRegisterWrite = flag;
0056:            }
0057:
0058:            /**
0059:             * Returns wakeupSelectorAfterRegisterWrite the flag that controls if wakeup is called on Selector
0060:             * after RegisterForWrite is called. 
0061:             * @since 1.4.7
0062:             */
0063:            public static boolean getWakeupSelectorAfterRegisterWrite() {
0064:                return wakeupSelectorAfterRegisterWrite;
0065:            }
0066:
0067:            /**
0068:             * Sets the flag to wakeup Selector After RegisterForRead is called.
0069:             * @since 1.4.7
0070:             */
0071:            public static void setWakeupSelectorAfterRegisterRead(boolean flag) {
0072:                wakeupSelectorAfterRegisterRead = flag;
0073:            }
0074:
0075:            /**
0076:             * Returns wakeupSelectorAfterRegisterRead the flag that controls if wakeup is called on Selector
0077:             * after RegisterForRead is called. 
0078:             * @since 1.4.7
0079:             */
0080:            public static boolean getWakeupSelectorAfterRegisterRead() {
0081:                return wakeupSelectorAfterRegisterRead;
0082:            }
0083:
0084:            /**
0085:             * Sets the maximum count of thread allowed to run objects of this class at a time.
0086:             * @since 1.4.7
0087:             */
0088:            public static void setMaxThreadAccessCount(int count) {
0089:                if (count < 3 && count != -1)
0090:                    throw new IllegalArgumentException(
0091:                            "Value should be >=3 or -1");
0092:                maxThreadAccessCount = count;
0093:            }
0094:
0095:            /**
0096:             * Returns the maximum count of thread allowed to run objects of this class at a time.
0097:             * @since 1.4.7
0098:             */
0099:            public static int getMaxThreadAccessCount() {
0100:                return maxThreadAccessCount;
0101:            }
0102:
0103:            //v1.4.7
0104:            private ByteBufferOutputStream byteBufferOutputStream;
0105:
0106:            public NonBlockingClientHandler(int instanceCount) {
0107:                super (instanceCount);
0108:            }
0109:
0110:            public NonBlockingClientHandler() {
0111:                super ();
0112:            }
0113:
0114:            public void clean() {
0115:                logger.finest("Starting clean - " + getName());
0116:                if (threadAccessCount != 0) {
0117:                    logger.warning("Thread Access Count was not 0!: "
0118:                            + threadAccessCount);
0119:                    if (Assertion.isEnabled()) {
0120:                        assertionSystemExit();
0121:                    }
0122:                    threadAccessCount = 0;
0123:                }
0124:
0125:                while (readByteBuffer.isEmpty() == false) {
0126:                    try {
0127:                        getServer().getByteBufferPool().returnObject(
0128:                                readByteBuffer.remove(0));
0129:                    } catch (Exception er) {
0130:                        appLogger
0131:                                .warning("Error in returning read ByteBuffer to pool: "
0132:                                        + er);
0133:                        break;
0134:                    }
0135:                }
0136:
0137:                while (writeByteBuffer.isEmpty() == false) {
0138:                    try {
0139:                        getServer().getByteBufferPool().returnObject(
0140:                                writeByteBuffer.remove(0));
0141:                    } catch (Exception er) {
0142:                        appLogger
0143:                                .warning("Error in returning write ByteBuffer to pool: "
0144:                                        + er);
0145:                        break;
0146:                    }
0147:                }
0148:
0149:                if (selectionKey != null) {
0150:                    selectionKey.cancel();
0151:                    selectionKey.selector().wakeup();
0152:                    selectionKey = null;
0153:                }
0154:                willReturn = false;
0155:                waitingForFinalWrite = false;
0156:                socketChannel = null;
0157:                if (byteBufferOutputStream != null) {
0158:                    byteBufferOutputStream.close();
0159:                }
0160:
0161:                super .clean();
0162:
0163:                clientWriteHandler = null;//1.4.5		
0164:                byteBufferOutputStream = null;
0165:
0166:                logger.finest("Finished clean - " + getName());
0167:            }
0168:
0169:            protected void finalize() throws Throwable {
0170:                clean();
0171:                super .finalize();
0172:            }
0173:
0174:            public void handleClient(TheClient theClient) {
0175:                super .handleClient(theClient);
0176:                setClientWriteHandler(theClient.getClientWriteHandler()); //v1.4.5
0177:                setSocketChannel(theClient.getSocketChannel());//1.4.5
0178:            }
0179:
0180:            protected void setInputStream(InputStream in) throws IOException {
0181:                this .in = in;
0182:                if (getDataMode(DataType.IN) == DataMode.STRING) {
0183:                    b_in = null;
0184:                    o_in = null;
0185:                    bufferedReader = null;
0186:                } else if (getDataMode(DataType.IN) == DataMode.OBJECT) {
0187:                    b_in = null;
0188:                    bufferedReader = null;
0189:                    o_in = new ObjectInputStream(in);
0190:                } else if (getDataMode(DataType.IN) == DataMode.BYTE
0191:                        || getDataMode(DataType.IN) == DataMode.BINARY) {
0192:                    o_in = null;
0193:                    bufferedReader = null;
0194:                    b_in = null;
0195:                }
0196:            }
0197:
0198:            public BufferedReader getBufferedReader() {
0199:                throw new IllegalStateException(
0200:                        "Access to BufferedReader in not allowed in Non-Blocking mode!");
0201:            }
0202:
0203:            public void closeConnection() {
0204:                synchronized (this ) {
0205:                    if (connection == false)
0206:                        return;
0207:                    if (waitingForFinalWrite)
0208:                        return;
0209:                    if (getSelectionKey() != null
0210:                            && getSelectionKey().isValid() && lost == false) {
0211:                        waitingForFinalWrite = true;
0212:                    } else {
0213:                        connection = false;
0214:                    }
0215:                }
0216:
0217:                try {
0218:                    if (getSocketChannel() != null && socket != null) {
0219:                        if (waitingForFinalWrite) {
0220:                            try {
0221:                                waitTillFullyWritten();
0222:                            } catch (Exception error) {
0223:                                logger
0224:                                        .warning("Error in waitingForFinalWrite : "
0225:                                                + error);
0226:                                if (logger.isLoggable(Level.FINE)) {
0227:                                    logger.fine("StackTrace:\n"
0228:                                            + MyString.getStackTrace(error));
0229:                                }
0230:                            } finally {
0231:                                connection = false;
0232:                                byteBufferOutputStream.forceNotify();
0233:                                getSelectionKey().cancel();
0234:                            }
0235:                        }//end of waitingForFinalWrite
0236:
0237:                        synchronized (this ) {
0238:                            if (hasEvent(ClientEvent.MAX_CON) == false) {
0239:                                notifyCloseOrLost();
0240:                            }
0241:                            if (getSocketChannel().isOpen()) {
0242:                                logger.finest("Closing SocketChannel");
0243:                                getSocketChannel().close();
0244:                            }
0245:                        }
0246:                    }
0247:                    if (getServer() != null) {
0248:                        getServer().getSelector().wakeup();
0249:                    }
0250:                } catch (IOException e) {
0251:                    logger.warning("Error in closeConnection : " + e);
0252:                    if (logger.isLoggable(Level.FINE)) {
0253:                        logger
0254:                                .fine("StackTrace:\n"
0255:                                        + MyString.getStackTrace(e));
0256:                    }
0257:                } catch (NullPointerException npe) {
0258:                    logger.fine("NullPointerException: " + npe);
0259:                    if (logger.isLoggable(Level.FINE)) {
0260:                        logger.fine("StackTrace:\n"
0261:                                + MyString.getStackTrace(npe));
0262:                    }
0263:                }
0264:            }
0265:
0266:            /**
0267:             * waitTillFullyWritten
0268:             * @since 1.4.7
0269:             */
0270:            public void waitTillFullyWritten() {
0271:                Object waitLock = new Object();
0272:                if (byteBufferOutputStream.isDataAvailableForWrite(waitLock)) {
0273:                    if (ByteBufferOutputStream.isLoggable(Level.FINEST)) {
0274:                        logger.finest("Waiting " + getName());
0275:                    }
0276:                    try {
0277:                        synchronized (waitLock) {
0278:                            waitLock.wait(1000 * 60 * 2);//2 min max
0279:                        }
0280:                    } catch (InterruptedException ie) {
0281:                        logger.warning("Error: " + ie);
0282:                    }
0283:                    if (ByteBufferOutputStream.isLoggable(Level.FINEST)) {
0284:                        logger.finest("Done. " + getName());
0285:                    }
0286:                }
0287:            }
0288:
0289:            public void run() {
0290:                if (unprocessedClientEvents.size() == 0) {
0291:                    logger.finest("No unprocessed ClientEvents!");
0292:                    return;
0293:                }
0294:
0295:                synchronized (this ) {
0296:                    if (willReturn) {
0297:                        return;
0298:                    } else {
0299:                        threadAccessCount++;
0300:                    }
0301:                }
0302:
0303:                ClientEvent currentEvent = (ClientEvent) unprocessedClientEvents
0304:                        .remove(0);
0305:
0306:                if (logger.isLoggable(Level.FINEST)) {
0307:                    StringBuffer sb = new StringBuffer();
0308:                    sb.append("Running ").append(getName());
0309:                    sb.append(" using ");
0310:                    sb.append(Thread.currentThread().getName());
0311:                    sb.append(" for ");
0312:
0313:                    synchronized (clientEvents) {
0314:                        if (clientEvents.size() > 1) {
0315:                            sb.append(currentEvent + ", Current Events - "
0316:                                    + clientEvents);
0317:                        } else {
0318:                            sb.append(currentEvent);
0319:                        }
0320:                    }
0321:                    logger.finest(sb.toString());
0322:                }
0323:
0324:                if (currentEvent == null) {
0325:                    threadEvent.set(null);
0326:                    return;
0327:                } else {
0328:                    threadEvent.set(currentEvent);
0329:                }
0330:
0331:                try {
0332:                    if (maxThreadAccessCount != -1
0333:                            && threadAccessCount > maxThreadAccessCount) {
0334:                        logger.warning("ThreadAccessCount can't go beyond "
0335:                                + maxThreadAccessCount + ": "
0336:                                + threadAccessCount);
0337:                        if (Assertion.isEnabled()) {
0338:                            throw new AssertionError(
0339:                                    "ThreadAccessCount can't go beyond "
0340:                                            + maxThreadAccessCount + ": "
0341:                                            + threadAccessCount);
0342:                        }
0343:                        return;
0344:                    }
0345:
0346:                    if (socket == null)
0347:                        throw new SocketException("Socket was null!");
0348:
0349:                    if (getThreadEvent() == ClientEvent.ACCEPT
0350:                            || getThreadEvent() == ClientEvent.MAX_CON) {
0351:                        prepareForRun();
0352:                        Assertion.affirm(willReturn == false,
0353:                                "WillReturn has to be false!: " + willReturn);
0354:                    }
0355:
0356:                    if (getThreadEvent() == ClientEvent.MAX_CON) {
0357:                        processMaxConnection(currentEvent);
0358:                    }
0359:
0360:                    try {
0361:                        if (getThreadEvent() == ClientEvent.ACCEPT) {
0362:                            registerForRead();
0363:                            clientEventHandler.gotConnected(this );
0364:
0365:                            if (authorised == false) {
0366:                                if (clientAuthenticationHandler == null
0367:                                        && authenticator == null) {
0368:                                    authorised = true;
0369:                                    logger.finest("No Authenticator "
0370:                                            + getName() + " so return thread.");
0371:                                } else {
0372:                                    if (clientAuthenticationHandler != null) {
0373:                                        AuthStatus authStatus = null;
0374:                                        do {
0375:                                            authStatus = processAuthorisation();
0376:                                        } while (authStatus == AuthStatus.FAILURE);
0377:
0378:                                        if (authStatus == AuthStatus.SUCCESS)
0379:                                            authorised = true;
0380:                                    } else {
0381:                                        processAuthorisation();
0382:                                    }
0383:                                    if (authorised)
0384:                                        logger.finest("Authentication done "
0385:                                                + getName()
0386:                                                + ", so return thread.");
0387:                                    else
0388:                                        logger
0389:                                                .finest("askAuthentication() done "
0390:                                                        + getName()
0391:                                                        + ", so return thread.");
0392:                                }
0393:                            }//end authorised
0394:                            returnThread(); //return thread to pool
0395:                            return;
0396:                        }
0397:
0398:                        if (connection && getThreadEvent() == ClientEvent.READ) {
0399:                            if (processRead())
0400:                                return;
0401:                        }
0402:
0403:                        if (connection && getThreadEvent() == ClientEvent.WRITE) {
0404:                            if (processWrite())
0405:                                return;
0406:                        }
0407:
0408:                    } catch (SocketException e) {
0409:                        appLogger.finest("SocketException - Client ["
0410:                                + getHostAddress() + "]: " + e.getMessage());
0411:                        //e.printStackTrace();
0412:                        lost = true;
0413:                    } catch (AppException e) {
0414:                        //errors from Application
0415:                        appLogger.finest("AppException "
0416:                                + Thread.currentThread().getName() + ": "
0417:                                + e.getMessage());
0418:                    } catch (javax.net.ssl.SSLException e) {
0419:                        lost = true;
0420:                        if (Assertion.isEnabled()) {
0421:                            appLogger.info("SSLException - Client ["
0422:                                    + getHostAddress() + "] "
0423:                                    + Thread.currentThread().getName() + ": "
0424:                                    + e);
0425:                        } else {
0426:                            appLogger.warning("SSLException - Client ["
0427:                                    + getHostAddress() + "]: " + e);
0428:                        }
0429:                    } catch (ConnectionLostException e) {
0430:                        lost = true;
0431:                        if (e.getMessage() != null)
0432:                            appLogger.finest("Connection lost "
0433:                                    + Thread.currentThread().getName() + ": "
0434:                                    + e.getMessage());
0435:                        else
0436:                            appLogger.finest("Connection lost "
0437:                                    + Thread.currentThread().getName());
0438:                    } catch (ClosedChannelException e) {
0439:                        lost = true;
0440:                        appLogger.finest("Channel closed "
0441:                                + Thread.currentThread().getName() + ": " + e);
0442:                    } catch (IOException e) {
0443:                        lost = true;
0444:                        appLogger.fine("IOError "
0445:                                + Thread.currentThread().getName() + ": " + e);
0446:                    } catch (AssertionError er) {
0447:                        logger.warning("[AssertionError] " + getName() + " "
0448:                                + er);
0449:                        if (logger.isLoggable(Level.FINEST)) {
0450:                            logger.finest("StackTrace "
0451:                                    + Thread.currentThread().getName() + ": "
0452:                                    + MyString.getStackTrace(er));
0453:                        }
0454:                        assertionSystemExit();
0455:                    } catch (Error er) {
0456:                        logger.warning("[Error] " + er);
0457:                        if (logger.isLoggable(Level.FINEST)) {
0458:                            logger.finest("StackTrace "
0459:                                    + Thread.currentThread().getName() + ": "
0460:                                    + MyString.getStackTrace(er));
0461:                        }
0462:                        if (Assertion.isEnabled()) {
0463:                            assertionSystemExit();
0464:                        }
0465:                        lost = true;
0466:                    } catch (RuntimeException re) {
0467:                        logger.warning("[RuntimeException] "
0468:                                + MyString.getStackTrace(re));
0469:                        if (Assertion.isEnabled()) {
0470:                            assertionSystemExit();
0471:                        }
0472:                        lost = true;
0473:                    }
0474:
0475:                    if (getThreadEvent() != ClientEvent.MAX_CON) {
0476:                        notifyCloseOrLost();
0477:                    }
0478:
0479:                    if (connection) {
0480:                        logger.finest(Thread.currentThread().getName()
0481:                                + " calling closeConnection()");
0482:                        closeConnection();
0483:                    }
0484:
0485:                    if (connection == true && lost == true
0486:                            && waitingForFinalWrite) {
0487:                        byteBufferOutputStream.forceNotify();
0488:                    }
0489:                } catch (javax.net.ssl.SSLException se) {
0490:                    logger.warning("SSLException "
0491:                            + Thread.currentThread().getName() + " - " + se);
0492:                } catch (IOException ie) {
0493:                    logger.warning("IOError "
0494:                            + Thread.currentThread().getName()
0495:                            + " - Closing Client : " + ie);
0496:                } catch (RuntimeException re) {
0497:                    logger.warning("[RuntimeException] " + getName() + " "
0498:                            + Thread.currentThread().getName() + " - "
0499:                            + MyString.getStackTrace(re));
0500:                    if (Assertion.isEnabled()) {
0501:                        assertionSystemExit();
0502:                    }
0503:                } catch (Exception e) {
0504:                    logger.warning("Error " + Thread.currentThread().getName()
0505:                            + " - Event:" + getThreadEvent() + " - Socket:"
0506:                            + socket + " : " + e);
0507:                    logger.fine("StackTrace: " + getName() + "\n"
0508:                            + MyString.getStackTrace(e));
0509:                    if (Assertion.isEnabled()) {
0510:                        assertionSystemExit();
0511:                    }
0512:                } catch (Error e) {
0513:                    logger.warning("Error " + Thread.currentThread().getName()
0514:                            + " - Event:" + getThreadEvent() + " - Socket:"
0515:                            + socket + " : " + e);
0516:                    logger.fine("StackTrace: " + getName() + "\n"
0517:                            + MyString.getStackTrace(e));
0518:                    if (Assertion.isEnabled()) {
0519:                        assertionSystemExit();
0520:                    }
0521:                }
0522:
0523:                synchronized (this ) {
0524:                    try {
0525:                        if (getSelectionKey() != null
0526:                                && getSelectionKey().isValid()) {
0527:                            logger.finest("Canceling SelectionKey");
0528:                            getSelectionKey().cancel();
0529:                        }
0530:
0531:                        if (socket != null && socket.isClosed() == false) {
0532:                            logger.finest("Closing Socket");
0533:                            socket.close();
0534:                        }
0535:
0536:                        if (getSocketChannel() != null
0537:                                && getSocketChannel().isOpen()) {
0538:                            logger.finest("Closing SocketChannel");
0539:                            socketChannel.close();
0540:                        }
0541:                    } catch (Exception re) {
0542:                        logger.warning("Error closing Socket/Channel: " + re);
0543:                    }
0544:                }//end synchronized
0545:
0546:                willClean = true;
0547:                returnClientData();
0548:
0549:                boolean returnClientHandler = false;
0550:                synchronized (lockObj) {
0551:                    returnThread();
0552:                    returnClientHandler = checkReturnClientHandler();
0553:                }
0554:
0555:                if (returnClientHandler) {
0556:                    returnClientHandler(); //return to pool
0557:                }
0558:            }
0559:
0560:            protected boolean checkReturnClientHandler() {
0561:                if (willReturn == false) {
0562:                    willReturn = true;
0563:                    return true;
0564:                }
0565:                return false;
0566:            }
0567:
0568:            /**
0569:             * Process read
0570:             * @return value indicates if the thread should return form run()
0571:             */
0572:            private boolean processRead() throws Exception, AppException {
0573:                int count = 0;
0574:                int fullCount = 0;
0575:                ByteBuffer buffer = (ByteBuffer) getServer()
0576:                        .getByteBufferPool().borrowObject();
0577:
0578:                while (true) {
0579:                    try {
0580:                        count = getSocketChannel().read(buffer);
0581:                        if (count <= 0) {
0582:                            //logger.finest("SocketChannel read was "+count+"!");
0583:                            getServer().getByteBufferPool()
0584:                                    .returnObject(buffer);
0585:                            buffer = null;
0586:                            break;
0587:                        } else {
0588:                            fullCount += count;
0589:                        }
0590:
0591:                        buffer.flip(); // Make readable
0592:                        readByteBuffer.add(buffer);
0593:
0594:                        buffer = (ByteBuffer) getServer().getByteBufferPool()
0595:                                .borrowObject();
0596:                    } catch (Exception error) {
0597:                        logger.finest("Error in data read: " + error);
0598:                        lost = true;
0599:                        synchronized (getInputStream()) {
0600:                            getInputStream().notifyAll();
0601:                        }
0602:                        throw error;
0603:                    } finally {
0604:                        if (buffer != null && count <= 0) {
0605:                            getServer().getByteBufferPool()
0606:                                    .returnObject(buffer);
0607:                            buffer = null;
0608:                        }
0609:                    }
0610:                }//end while
0611:
0612:                if (count < 0) {
0613:                    logger.finest("SocketChannel read was " + count + "!");
0614:                    lost = true;
0615:                    synchronized (getInputStream()) {
0616:                        getInputStream().notifyAll();
0617:                    }
0618:                } else {
0619:                    logger.finest(fullCount + " bytes read");
0620:                    if (fullCount != 0) {
0621:                        updateLastCommunicationTime();
0622:                        synchronized (getInputStream()) {
0623:                            getInputStream().notify(); //if any are waiting
0624:                        }
0625:                        if (hasEvent(ClientEvent.ACCEPT) == false) {
0626:                            processGotDataInBuffers();
0627:                        }
0628:                    }
0629:
0630:                    //check if any data was read but not yet processed
0631:                    while (getInputStream().available() > 0) {
0632:                        logger.finest("Sending again for processing...");
0633:                        if (hasEvent(ClientEvent.ACCEPT) == false) {
0634:                            processGotDataInBuffers();
0635:                            break;
0636:                        } else {
0637:                            synchronized (getInputStream()) {
0638:                                getInputStream().notifyAll();
0639:                            }
0640:                            Thread.sleep(100);
0641:                        }
0642:                    }
0643:
0644:                    if (connection) {
0645:                        registerForRead();
0646:                        //getSelectionKey().selector().wakeup();
0647:                        returnThread(); //return to pool
0648:                        return true;
0649:                    }
0650:                }//end of else
0651:                logger
0652:                        .finest("We don't have connection, lets return all resources.");
0653:                return false;
0654:            }
0655:
0656:            /**
0657:             * Process write
0658:             * @return value indicates if the thread should return form run()
0659:             */
0660:            private boolean processWrite() throws IOException {
0661:                updateLastCommunicationTime();
0662:
0663:                boolean flag = byteBufferOutputStream.writeAllByteBuffer();
0664:
0665:                if (flag == false) {
0666:                    registerWrite();
0667:                } else if (/*flag==true && */clientWriteHandler != null) {
0668:                    clientWriteHandler.handleWrite(this );
0669:                }
0670:
0671:                if (connection) {
0672:                    returnThread(); //return to pool
0673:                    return true;
0674:                } else {
0675:                    logger
0676:                            .finest("We don't have connection, lets return all resources.");
0677:                }
0678:                return false;
0679:            }
0680:
0681:            protected void returnThread() {
0682:                threadAccessCount--;
0683:                Assertion.affirm(threadAccessCount >= 0,
0684:                        "ThreadAccessCount went less the 0! Value: "
0685:                                + threadAccessCount);
0686:                //return is done at ClientThread end
0687:                removeEvent((ClientEvent) threadEvent.get());
0688:            }
0689:
0690:            protected void returnClientHandler() {
0691:                logger.finest(getName());
0692:                try {
0693:                    for (int i = 0; threadAccessCount != 0; i++) {
0694:                        if (i == 100) {
0695:                            logger
0696:                                    .warning("ClientHandler must have got into a loop waiting for thread to free up! ThreadAccessCount="
0697:                                            + threadAccessCount);
0698:                            threadAccessCount = 0;
0699:                            if (Assertion.isEnabled()) {
0700:                                assertionSystemExit();
0701:                            } else {
0702:                                break;
0703:                            }
0704:                        }
0705:                        if (threadAccessCount <= 0)
0706:                            break;
0707:
0708:                        logger.finest("Waiting for other thread of "
0709:                                + getName() + " to finish");
0710:                        Thread.sleep(60);
0711:                    }
0712:                } catch (InterruptedException ie) {
0713:                    appLogger.warning("InterruptedException: " + ie);
0714:                }
0715:                super .returnClientHandler();
0716:            }
0717:
0718:            public void setDataMode(DataMode dataMode, DataType dataType)
0719:                    throws IOException {
0720:                if (getDataMode(dataType) == dataMode)
0721:                    return;
0722:
0723:                appLogger.fine("Setting Type:" + dataType + ", Mode:"
0724:                        + dataMode);
0725:                super .checkDataModeSet(dataMode, dataType);
0726:
0727:                setDataModeNonBlocking(dataMode, dataType);
0728:            }
0729:
0730:            private void setDataModeNonBlocking(DataMode dataMode,
0731:                    DataType dataType) throws IOException {
0732:                logger.finest("ENTER");
0733:                if (dataMode == DataMode.STRING) {
0734:                    if (dataType == DataType.OUT) {
0735:                        if (dataModeOUT == DataMode.BYTE
0736:                                || dataModeOUT == DataMode.BINARY) {
0737:                            dataModeOUT = dataMode;
0738:                        } else if (dataModeOUT == DataMode.OBJECT) {
0739:                            dataModeOUT = dataMode;
0740:                            o_out.flush();
0741:                            o_out = null;
0742:                            b_out = new BufferedOutputStream(out);
0743:                        } else {
0744:                            Assertion.affirm(false,
0745:                                    "Unknown DataType.OUT DataMode - "
0746:                                            + dataModeOUT);
0747:                        }
0748:                        Assertion.affirm(b_out != null,
0749:                                "BufferedOutputStream is still null!");
0750:                        Assertion.affirm(o_out == null,
0751:                                "ObjectOutputStream is still not null!");
0752:                    } else if (dataType == DataType.IN) {
0753:                        dataModeIN = dataMode;
0754:
0755:                        if (o_in != null) {
0756:                            if (o_in.available() != 0)
0757:                                logger
0758:                                        .warning("Data looks to be present in ObjectInputStream");
0759:                            o_in = null;
0760:                        }
0761:                        b_in = null;
0762:                        bufferedReader = null;
0763:                        //input stream will work
0764:                        Assertion.affirm(in != null,
0765:                                "InputStream is still null!");
0766:                        Assertion.affirm(b_in == null,
0767:                                "BufferedInputStream is still not null!");
0768:                        Assertion.affirm(bufferedReader == null,
0769:                                "BufferedReader is still not null!");
0770:                    }
0771:                } else if (dataMode == DataMode.OBJECT) {
0772:                    if (dataType == DataType.IN) {
0773:                        //we will disable this for now
0774:                        throw new IllegalArgumentException(
0775:                                "Can't set DataType.IN mode to OBJECT when blocking mode is set as false!");
0776:                    }
0777:
0778:                    if (dataType == DataType.OUT) {
0779:                        dataModeOUT = dataMode;
0780:                        b_out = null;
0781:                        o_out = new ObjectOutputStream(out);
0782:                        Assertion.affirm(o_out != null,
0783:                                "ObjectOutputStream is still null!");
0784:                    } /*else if(dataType == DataType.IN) {
0785:                    			dataModeIN = dataMode;
0786:                    			b_in = null;
0787:                    			bufferedReader = null;
0788:                    			//registerForRead();
0789:                    			o_in = new ObjectInputStream(in); //will block	
0790:                    			Assertion.affirm(o_in!=null, "ObjectInputStream is still null!");
0791:                    		}*/
0792:                } else if (dataMode == DataMode.BYTE
0793:                        || dataMode == DataMode.BINARY) {
0794:                    if (dataType == DataType.OUT) {
0795:                        if (dataModeOUT == DataMode.STRING
0796:                                || dataModeOUT == DataMode.BYTE
0797:                                || dataModeOUT == DataMode.BINARY) {
0798:                            dataModeOUT = dataMode;
0799:                        } else if (dataModeOUT == DataMode.OBJECT) {
0800:                            dataModeOUT = dataMode;
0801:
0802:                            o_out = null;
0803:                            b_out = new BufferedOutputStream(out);
0804:                        } else {
0805:                            Assertion.affirm(false,
0806:                                    "Unknown DataType.OUT - DataMode: "
0807:                                            + dataModeOUT);
0808:                        }
0809:                        Assertion.affirm(b_out != null,
0810:                                "BufferedOutputStream is still null!");
0811:                    } else if (dataType == DataType.IN) {
0812:                        dataModeIN = dataMode;
0813:                        o_in = null;
0814:                        bufferedReader = null;
0815:                        b_in = null;
0816:                        //input stream will work
0817:                        Assertion.affirm(in != null,
0818:                                "InputStream is still null!");
0819:                    } else {
0820:                        throw new IllegalArgumentException(
0821:                                "Unknown DataType : " + dataType);
0822:                    }
0823:                } else {
0824:                    throw new IllegalArgumentException("Unknown DataMode : "
0825:                            + dataMode);
0826:                }
0827:            }
0828:
0829:            protected byte[] readInputStream() throws IOException {
0830:                return readInputStream(getInputStream());
0831:            }
0832:
0833:            public void updateInputOutputStreams() throws IOException {
0834:                byteBufferOutputStream = new ByteBufferOutputStream(
0835:                        writeByteBuffer, this );
0836:                setInputStream(new ByteBufferInputStream(readByteBuffer, this ,
0837:                        getCharset()));
0838:                setOutputStream(byteBufferOutputStream);
0839:            }
0840:
0841:            public void setSocketChannel(SocketChannel socketChannel) {
0842:                this .socketChannel = socketChannel;
0843:            }
0844:
0845:            public SocketChannel getSocketChannel() {
0846:                return socketChannel;
0847:            }
0848:
0849:            public void setSelectionKey(SelectionKey selectionKey) {
0850:                this .selectionKey = selectionKey;
0851:            }
0852:
0853:            public SelectionKey getSelectionKey() {
0854:                if (selectionKey == null)
0855:                    selectionKey = getSocketChannel().keyFor(
0856:                            getServer().getSelector());
0857:                return selectionKey;
0858:            }
0859:
0860:            private void processGotDataInBuffers() throws AppException,
0861:                    ConnectionLostException, ClassNotFoundException,
0862:                    IOException {
0863:                if (getInputStream().available() == 0)
0864:                    return;
0865:
0866:                logger.finest("Trying to process got data.. DataMode.IN="
0867:                        + dataModeIN);
0868:                AuthStatus authStatus = null;
0869:
0870:                //--For debug
0871:                //((ByteBufferInputStream) getInputStream()).dumpContent();
0872:
0873:                String temp = null;
0874:                String rec = null;
0875:                Object recObject = null;
0876:                byte[] recByte = null;
0877:
0878:                boolean timeToCheckForNewLineMiss = false;
0879:
0880:                do {
0881:                    //updateLastCommunicationTime();
0882:
0883:                    if (dataModeIN == DataMode.STRING) {
0884:                        ByteBufferInputStream bbin = (ByteBufferInputStream) getInputStream();
0885:                        timeToCheckForNewLineMiss = true;
0886:
0887:                        while (bbin.isLineReady()) {
0888:
0889:                            rec = bbin.readLine();
0890:                            if (rec == null) {
0891:                                lost = true;
0892:                                return;
0893:                            }
0894:                            if (getCommunicationLogging() && authorised == true) {
0895:                                appLogger.fine("Got STRING ["
0896:                                        + getHostAddress() + "] : " + rec);
0897:                            }
0898:
0899:                            if (authorised == false)
0900:                                authStatus = clientAuthenticationHandler
0901:                                        .handleAuthentication(this , rec);
0902:                            else
0903:                                clientCommandHandler.handleCommand(this , rec);
0904:
0905:                            if (isClosed() == true)
0906:                                return;
0907:
0908:                            while (authStatus == AuthStatus.FAILURE)
0909:                                authStatus = processAuthorisation();
0910:
0911:                            if (authStatus == AuthStatus.SUCCESS)
0912:                                authorised = true;
0913:
0914:                            if (dataModeIN != DataMode.STRING) {
0915:                                break;
0916:                            }
0917:
0918:                            timeToCheckForNewLineMiss = false;
0919:                        }//end of while
0920:
0921:                        if (timeToCheckForNewLineMiss
0922:                                && bbin.availableOnlyInByteBuffer() == 0) {
0923:                            return;
0924:                        } else {
0925:                            timeToCheckForNewLineMiss = false;
0926:                        }
0927:                    }
0928:
0929:                    //} else if(dataModeIN == DataMode.OBJECT) {
0930:                    /*
0931:                    while(dataModeIN == DataMode.OBJECT && o_in!=null) {
0932:                    	recObject = o_in.readObject();
0933:                    	if(recObject==null) {
0934:                    		lost = true;
0935:                    		return;
0936:                    	}
0937:                    	if(getCommunicationLogging() && authorised == true) {
0938:                    		appLogger.fine("Got OBJECT ["+getHostAddress()+"] : "+
0939:                    			recObject.toString());
0940:                    	}
0941:
0942:
0943:                    	if(authorised == false)
0944:                    		authStatus = clientAuthenticationHandler.handleAuthentication(this, recObject);
0945:                    	else
0946:                    		clientObjectHandler.handleObject(this, recObject);
0947:                    	
0948:                    	if(isClosed()==true) return;
0949:
0950:                    	while(authStatus==AuthStatus.FAILURE)
0951:                    		authStatus = processAuthorisation();
0952:                    	
0953:                    	if(authStatus==AuthStatus.SUCCESS)
0954:                    		authorised = true;
0955:                    }
0956:                     */
0957:
0958:                    //} else if(dataModeIN == DataMode.BYTE) {
0959:                    while (dataModeIN == DataMode.BYTE
0960:                            && getInputStream().available() != 0) {
0961:                        rec = readBytes();
0962:                        if (rec == null) {
0963:                            lost = true;
0964:                            return;
0965:                        }
0966:                        if (getCommunicationLogging() && authorised == true) {
0967:                            appLogger.fine("Got BYTE [" + getHostAddress()
0968:                                    + "] : " + rec);
0969:                        }
0970:
0971:                        if (authorised == false)
0972:                            authStatus = clientAuthenticationHandler
0973:                                    .handleAuthentication(this , rec);
0974:                        else
0975:                            clientCommandHandler.handleCommand(this , rec);
0976:
0977:                        if (isClosed() == true)
0978:                            return;
0979:
0980:                        while (authStatus == AuthStatus.FAILURE)
0981:                            authStatus = processAuthorisation();
0982:
0983:                        if (authStatus == AuthStatus.SUCCESS)
0984:                            authorised = true;
0985:                    }
0986:
0987:                    //} else if(dataModeIN == DataMode.BINARY) {
0988:                    while (dataModeIN == DataMode.BINARY
0989:                            && getInputStream().available() != 0) {
0990:                        recByte = readBinary();
0991:                        if (recByte == null) {
0992:                            lost = true;
0993:                            return;
0994:                        }
0995:                        if (getCommunicationLogging() && authorised == true) {
0996:                            appLogger.fine("Got BINARY [" + getHostAddress()
0997:                                    + "] : "
0998:                                    + MyString.getMemInfo(recByte.length));
0999:                        }
1000:
1001:                        if (authorised == false)
1002:                            authStatus = clientAuthenticationHandler
1003:                                    .handleAuthentication(this , recByte);
1004:                        else
1005:                            clientBinaryHandler.handleBinary(this , recByte);
1006:
1007:                        if (isClosed() == true)
1008:                            return;
1009:
1010:                        while (authStatus == AuthStatus.FAILURE)
1011:                            authStatus = processAuthorisation();
1012:
1013:                        if (authStatus == AuthStatus.SUCCESS)
1014:                            authorised = true;
1015:                    }
1016:
1017:                    //} else {
1018:                    if (dataModeIN != DataMode.STRING
1019:                            && dataModeIN != DataMode.OBJECT
1020:                            && dataModeIN != DataMode.BYTE
1021:                            && dataModeIN != DataMode.BINARY) {
1022:                        throw new IllegalStateException(
1023:                                "Incoming DataMode is not supported : "
1024:                                        + dataModeIN);
1025:                    }
1026:                } while (getInputStream().available() != 0);
1027:            }
1028:
1029:            public void registerForRead() throws IOException,
1030:                    ClosedChannelException {
1031:                try {
1032:                    if (getSelectionKey() == null) {
1033:                        boolean flag = getServer().registerChannel(
1034:                                getSocketChannel(), SelectionKey.OP_READ, this );
1035:                        if (flag) {
1036:                            logger.finest("Adding OP_READ as interest Ops for "
1037:                                    + getName());
1038:                        } else if (ByteBufferOutputStream
1039:                                .isLoggable(Level.FINEST)) {
1040:                            logger
1041:                                    .finest("OP_READ is already present in interest Ops for "
1042:                                            + getName());
1043:                        }
1044:                    } else if (getSelectionKey().isValid()) {
1045:                        if ((getSelectionKey().interestOps() & SelectionKey.OP_READ) == 0) {
1046:                            logger.finest("Adding OP_READ to interest Ops for "
1047:                                    + getName());
1048:                            removeEvent(ClientEvent.READ);
1049:                            getSelectionKey().interestOps(
1050:                                    getSelectionKey().interestOps()
1051:                                            | SelectionKey.OP_READ);
1052:                            if (wakeupSelectorAfterRegisterRead) {
1053:                                getServer().getSelector().wakeup();
1054:                            }
1055:                        } else {
1056:                            if (ByteBufferOutputStream.isLoggable(Level.FINEST)) {
1057:                                logger
1058:                                        .finest("OP_READ is already present in interest Ops for "
1059:                                                + getName());
1060:                            }
1061:                        }
1062:                    } else {
1063:                        throw new IOException("SelectionKey is invalid!");
1064:                    }
1065:                } catch (CancelledKeyException e) {
1066:                    throw new IOException("SelectionKey is cancelled!");
1067:                }
1068:            }
1069:
1070:            public void registerForWrite() throws IOException,
1071:                    ClosedChannelException {
1072:                if (hasEvent(ClientEvent.RUN_BLOCKING)
1073:                        || hasEvent(ClientEvent.MAX_CON_BLOCKING)) {
1074:                    throw new IllegalStateException(
1075:                            "This method is only allowed under Non-Blocking mode.");
1076:                }
1077:
1078:                if (clientWriteHandler == null) {
1079:                    throw new IllegalStateException(
1080:                            "ClientWriteHandler has not been set!");
1081:                }
1082:                registerWrite();
1083:            }
1084:
1085:            public void registerWrite() throws IOException {
1086:                try {
1087:                    if (getSelectionKey() == null) {
1088:                        boolean flag = getServer()
1089:                                .registerChannel(getSocketChannel(),
1090:                                        SelectionKey.OP_WRITE, this );
1091:                        if (flag) {
1092:                            logger
1093:                                    .finest("Adding OP_WRITE as interest Ops for "
1094:                                            + getName());
1095:                        } else if (ByteBufferOutputStream
1096:                                .isLoggable(Level.FINEST)) {
1097:                            logger
1098:                                    .finest("OP_WRITE is already present in interest Ops for "
1099:                                            + getName());
1100:                        }
1101:                    } else if (getSelectionKey().isValid()) {
1102:                        if ((getSelectionKey().interestOps() & SelectionKey.OP_WRITE) == 0) {
1103:                            logger
1104:                                    .finest("Adding OP_WRITE to interest Ops for "
1105:                                            + getName());
1106:                            removeEvent(ClientEvent.WRITE);
1107:                            getSelectionKey().interestOps(
1108:                                    getSelectionKey().interestOps()
1109:                                            | SelectionKey.OP_WRITE);
1110:                            if (wakeupSelectorAfterRegisterWrite) {
1111:                                getServer().getSelector().wakeup();
1112:                            }
1113:                        } else {
1114:                            if (ByteBufferOutputStream.isLoggable(Level.FINEST)) {
1115:                                logger
1116:                                        .finest("OP_WRITE is already present in interest Ops for "
1117:                                                + getName());
1118:                            }
1119:                        }
1120:                    } else {
1121:                        throw new IOException("SelectionKey is invalid!");
1122:                    }
1123:                } catch (CancelledKeyException e) {
1124:                    throw new IOException("SelectionKey is cancelled!");
1125:                }
1126:            }
1127:
1128:            protected void setClientWriteHandler(ClientWriteHandler handler) {
1129:                clientWriteHandler = handler;
1130:            }
1131:
1132:            /**
1133:             * Returns number of thread currently in this object.
1134:             * @since 1.4.6
1135:             */
1136:            public int getThreadAccessCount() {
1137:                return threadAccessCount;
1138:            }
1139:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.