Source Code Cross Referenced for SharedSocket.java in  » Database-JDBC-Connection-Pool » jTDS » net » sourceforge » jtds » jdbc » Java Source Code / Java DocumentationJava Source Code and Java Documentation

Java Source Code / Java Documentation
1. 6.0 JDK Core
2. 6.0 JDK Modules
3. 6.0 JDK Modules com.sun
4. 6.0 JDK Modules com.sun.java
5. 6.0 JDK Modules sun
6. 6.0 JDK Platform
7. Ajax
8. Apache Harmony Java SE
9. Aspect oriented
10. Authentication Authorization
11. Blogger System
12. Build
13. Byte Code
14. Cache
15. Chart
16. Chat
17. Code Analyzer
18. Collaboration
19. Content Management System
20. Database Client
21. Database DBMS
22. Database JDBC Connection Pool
23. Database ORM
24. Development
25. EJB Server geronimo
26. EJB Server GlassFish
27. EJB Server JBoss 4.2.1
28. EJB Server resin 3.1.5
29. ERP CRM Financial
30. ESB
31. Forum
32. GIS
33. Graphic Library
34. Groupware
35. HTML Parser
36. IDE
37. IDE Eclipse
38. IDE Netbeans
39. Installer
40. Internationalization Localization
41. Inversion of Control
42. Issue Tracking
43. J2EE
44. JBoss
45. JMS
46. JMX
47. Library
48. Mail Clients
49. Net
50. Parser
51. PDF
52. Portal
53. Profiler
54. Project Management
55. Report
56. RSS RDF
57. Rule Engine
58. Science
59. Scripting
60. Search Engine
61. Security
62. Sevlet Container
63. Source Control
64. Swing Library
65. Template Engine
66. Test Coverage
67. Testing
68. UML
69. Web Crawler
70. Web Framework
71. Web Mail
72. Web Server
73. Web Services
74. Web Services apache cxf 2.0.1
75. Web Services AXIS2
76. Wiki Engine
77. Workflow Engines
78. XML
79. XML UI
Java
Java Tutorial
Java Open Source
Jar File Download
Java Articles
Java Products
Java by API
Photoshop Tutorials
Maya Tutorials
Flash Tutorials
3ds-Max Tutorials
Illustrator Tutorials
GIMP Tutorials
C# / C Sharp
C# / CSharp Tutorial
C# / CSharp Open Source
ASP.Net
ASP.NET Tutorial
JavaScript DHTML
JavaScript Tutorial
JavaScript Reference
HTML / CSS
HTML CSS Reference
C / ANSI-C
C Tutorial
C++
C++ Tutorial
Ruby
PHP
Python
Python Tutorial
Python Open Source
SQL Server / T-SQL
SQL Server / T-SQL Tutorial
Oracle PL / SQL
Oracle PL/SQL Tutorial
PostgreSQL
SQL / MySQL
MySQL Tutorial
VB.Net
VB.Net Tutorial
Flash / Flex / ActionScript
VBA / Excel / Access / Word
XML
XML Tutorial
Microsoft Office PowerPoint 2007 Tutorial
Microsoft Office Excel 2007 Tutorial
Microsoft Office Word 2007 Tutorial
Java Source Code / Java Documentation » Database JDBC Connection Pool » jTDS » net.sourceforge.jtds.jdbc 
Source Cross Referenced  Class Diagram Java Document (Java Doc) 


0001:        // jTDS JDBC Driver for Microsoft SQL Server and Sybase
0002:        // Copyright (C) 2004 The jTDS Project
0003:        //
0004:        // This library is free software; you can redistribute it and/or
0005:        // modify it under the terms of the GNU Lesser General Public
0006:        // License as published by the Free Software Foundation; either
0007:        // version 2.1 of the License, or (at your option) any later version.
0008:        //
0009:        // This library is distributed in the hope that it will be useful,
0010:        // but WITHOUT ANY WARRANTY; without even the implied warranty of
0011:        // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0012:        // Lesser General Public License for more details.
0013:        //
0014:        // You should have received a copy of the GNU Lesser General Public
0015:        // License along with this library; if not, write to the Free Software
0016:        // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
0017:        //
0018:        package net.sourceforge.jtds.jdbc;
0019:
0020:        import java.io.DataInputStream;
0021:        import java.io.DataOutputStream;
0022:        import java.io.EOFException;
0023:        import java.io.File;
0024:        import java.io.IOException;
0025:        import java.io.InputStream;
0026:        import java.io.OutputStream;
0027:        import java.io.RandomAccessFile;
0028:        import java.lang.reflect.Constructor;
0029:        import java.lang.reflect.InvocationTargetException;
0030:        import java.lang.reflect.Method;
0031:        import java.net.Socket;
0032:        import java.net.SocketException;
0033:        import java.net.UnknownHostException;
0034:        import java.util.ArrayList;
0035:        import java.util.LinkedList;
0036:
0037:        import net.sourceforge.jtds.ssl.SocketFactories;
0038:        import net.sourceforge.jtds.util.Logger;
0039:
0040:        /**
0041:         * This class mananges the physical connection to the SQL Server and
0042:         * serialises its use amongst a number of virtual sockets.
0043:         * This allows one physical connection to service a number of concurrent
0044:         * statements.
0045:         * <p>
0046:         * Constraints and assumptions:
0047:         * <ol>
0048:         * <li>Callers will not attempt to read from the server without issuing a request first.
0049:         * <li>The end of a server reply can be identified as byte 2 of the header is non zero.
0050:         * </ol>
0051:         * </p>
0052:         * Comments:
0053:         * <ol>
0054:         * <li>This code will discard unread server data if a new request is issued.
0055:         *    Currently the higher levels of the driver attempt to do this but may be
0056:         *    we can just rely on this code instead.
0057:         * <li>A cancel can be issued by a caller only if the server is currently sending
0058:         *    data for the caller otherwise the cancel is ignored.
0059:         * <li>Cancel packets on their own are returned as extra records appended to the
0060:         *     previous packet so that the TdsCore module can process them.
0061:         * </ol>
0062:         * This version of the class will start to cache results to disk once a predetermined
0063:         * maximum buffer memory threshold has been passed. Small result sets that will fit
0064:         * within a specified limit (default 8 packets) will continue to be held in memory
0065:         * (even if the memory threshold has been passed) in the interests of efficiency.
0066:         *
0067:         * @author Mike Hutchinson.
0068:         * @version $Id: SharedSocket.java,v 1.39 2007/07/08 21:38:13 bheineman Exp $
0069:         */
0070:        class SharedSocket {
0071:            /**
0072:             * This inner class contains the state information for the virtual socket.
0073:             */
0074:            private static class VirtualSocket {
0075:                /**
0076:                 * The stream ID of the stream objects owning this state.
0077:                 */
0078:                final int owner;
0079:                /**
0080:                 * Memory resident packet queue.
0081:                 */
0082:                final LinkedList pktQueue;
0083:                /**
0084:                 * True to discard network data.
0085:                 */
0086:                boolean flushInput;
0087:                /**
0088:                 * True if output is complete TDS packet.
0089:                 */
0090:                boolean complete;
0091:                /**
0092:                 * File object for disk packet queue.
0093:                 */
0094:                File queueFile;
0095:                /**
0096:                 * I/O Stream for disk packet queue.
0097:                 */
0098:                RandomAccessFile diskQueue;
0099:                /**
0100:                 * Number of packets cached to disk.
0101:                 */
0102:                int pktsOnDisk;
0103:                /**
0104:                 * Total of input packets in memory or disk.
0105:                 */
0106:                int inputPkts;
0107:
0108:                /**
0109:                 * Constuct object to hold state information for each caller.
0110:                 * @param streamId the Response/Request stream id.
0111:                 */
0112:                VirtualSocket(int streamId) {
0113:                    this .owner = streamId;
0114:                    this .pktQueue = new LinkedList();
0115:                    this .flushInput = false;
0116:                    this .complete = false;
0117:                    this .queueFile = null;
0118:                    this .diskQueue = null;
0119:                    this .pktsOnDisk = 0;
0120:                    this .inputPkts = 0;
0121:                }
0122:            }
0123:
0124:            /**
0125:             * The shared network socket.
0126:             */
0127:            private Socket socket;
0128:            /**
0129:             * The shared SSL network socket;
0130:             */
0131:            private Socket sslSocket;
0132:            /**
0133:             * Output stream for network socket.
0134:             */
0135:            private DataOutputStream out;
0136:            /**
0137:             * Input stream for network socket.
0138:             */
0139:            private DataInputStream in;
0140:            /**
0141:             * Current maxium input buffer size.
0142:             */
0143:            private int maxBufSize = TdsCore.MIN_PKT_SIZE;
0144:            /**
0145:             * Table of stream objects sharing this socket.
0146:             */
0147:            private final ArrayList socketTable = new ArrayList();
0148:            /**
0149:             * The Stream ID of the object that is expecting a response from the server.
0150:             */
0151:            private int responseOwner = -1;
0152:            /**
0153:             * Buffer for packet header.
0154:             */
0155:            private final byte hdrBuf[] = new byte[TDS_HDR_LEN];
0156:            /**
0157:             * The directory to buffer data to.
0158:             */
0159:            private final File bufferDir;
0160:            /**
0161:             * Total memory usage in all instances of the driver
0162:             * NB. Access to this field should probably be synchronized
0163:             * but in practice lost updates will not matter much and I think
0164:             * all VMs tend to do atomic saves to integer variables.
0165:             */
0166:            private static int globalMemUsage;
0167:            /**
0168:             * Peak memory usage for debug purposes.
0169:             */
0170:            private static int peakMemUsage;
0171:            /**
0172:             * Max memory limit to use for buffers.
0173:             * Only when this limit is exceeded will the driver
0174:             * start caching to disk.
0175:             */
0176:            private static int memoryBudget = 100000; // 100K
0177:            /**
0178:             * Minimum number of packets that will be cached in memory
0179:             * before the driver tries to write to disk even if
0180:             * memoryBudget has been exceeded.
0181:             */
0182:            private static int minMemPkts = 8;
0183:            /**
0184:             * Global flag to indicate that security constraints mean
0185:             * that attempts to create work files will fail.
0186:             */
0187:            private static boolean securityViolation;
0188:            /**
0189:             * Tds protocol version
0190:             */
0191:            private int tdsVersion;
0192:            /**
0193:             * The servertype one of Driver.SQLSERVER or Driver.SYBASE
0194:             */
0195:            protected final int serverType;
0196:            /**
0197:             * The character set to use for converting strings to/from bytes.
0198:             */
0199:            private CharsetInfo charsetInfo;
0200:            /**
0201:             * Count of packets received.
0202:             */
0203:            private int packetCount;
0204:            /**
0205:             * The server host name.
0206:             */
0207:            private String host;
0208:            /**
0209:             * The server port number.
0210:             */
0211:            private int port;
0212:            /**
0213:             * A cancel packet is pending.
0214:             */
0215:            private boolean cancelPending;
0216:            /**
0217:             * Synchronization monitor for {@link #cancelPending} and
0218:             * {@link #responseOwner}.
0219:             */
0220:            private Object cancelMonitor = new Object();
0221:            /**
0222:             * Buffer for TDS_DONE packets
0223:             */
0224:            private byte doneBuffer[] = new byte[TDS_DONE_LEN];
0225:            /**
0226:             * TDS done token.
0227:             */
0228:            private static final int TDS_DONE_TOKEN = 253;
0229:            /**
0230:             * Length of a TDS_DONE token.
0231:             */
0232:            private static final int TDS_DONE_LEN = 9;
0233:            /**
0234:             * Length of TDS packet header.
0235:             */
0236:            private static final int TDS_HDR_LEN = 8;
0237:
0238:            protected SharedSocket(File bufferDir, int tdsVersion,
0239:                    int serverType) {
0240:                this .bufferDir = bufferDir;
0241:                this .tdsVersion = tdsVersion;
0242:                this .serverType = serverType;
0243:            }
0244:
0245:            /**
0246:             * Construct a <code>SharedSocket</code> object specifying host name and
0247:             * port.
0248:             *
0249:             * @param connection the connection object
0250:             * @throws IOException if socket open fails
0251:             */
0252:            SharedSocket(ConnectionJDBC2 connection) throws IOException,
0253:                    UnknownHostException {
0254:                this (connection.getBufferDir(), connection.getTdsVersion(),
0255:                        connection.getServerType());
0256:                this .host = connection.getServerName();
0257:                this .port = connection.getPortNumber();
0258:                if (Driver.JDBC3) {
0259:                    this .socket = createSocketForJDBC3(connection);
0260:                } else {
0261:                    this .socket = new Socket(this .host, this .port);
0262:                }
0263:                setOut(new DataOutputStream(socket.getOutputStream()));
0264:                setIn(new DataInputStream(socket.getInputStream()));
0265:                this .socket.setTcpNoDelay(connection.getTcpNoDelay());
0266:                this .socket.setSoTimeout(connection.getSocketTimeout() * 1000);
0267:            }
0268:
0269:            /**
0270:             * Creates a {@link Socket} through reflection when {@link Driver#JDBC3}
0271:             * is <code>true</code>.  Reflection must be used to stay compatible
0272:             * with JDK 1.3.
0273:             *
0274:             * @param connection the connection object
0275:             * @return a socket open to the host and port with the given timeout
0276:             * @throws IOException if socket open fails
0277:             */
0278:            private Socket createSocketForJDBC3(ConnectionJDBC2 connection)
0279:                    throws IOException {
0280:                final String host = connection.getServerName();
0281:                final int port = connection.getPortNumber();
0282:                final int loginTimeout = connection.getLoginTimeout();
0283:                final String bindAddress = connection.getBindAddress();
0284:                try {
0285:                    // Create the Socket
0286:                    Constructor socketConstructor = Socket.class
0287:                            .getConstructor(new Class[] {});
0288:                    Socket socket = (Socket) socketConstructor
0289:                            .newInstance(new Object[] {});
0290:
0291:                    // Create the InetSocketAddress
0292:                    Constructor constructor = Class.forName(
0293:                            "java.net.InetSocketAddress").getConstructor(
0294:                            new Class[] { String.class, int.class });
0295:                    Object address = constructor.newInstance(new Object[] {
0296:                            host, new Integer(port) });
0297:
0298:                    // Call Socket.bind(SocketAddress) if bindAddress parameter is set
0299:                    if (bindAddress != null && bindAddress.length() > 0) {
0300:                        Object localBindAddress = constructor
0301:                                .newInstance(new Object[] { bindAddress,
0302:                                        new Integer(0) });
0303:                        Method bind = Socket.class.getMethod("bind",
0304:                                new Class[] { Class
0305:                                        .forName("java.net.SocketAddress") });
0306:                        bind.invoke(socket, new Object[] { localBindAddress });
0307:                    }
0308:
0309:                    // Call Socket.connect(InetSocketAddress, int)
0310:                    Method connect = Socket.class.getMethod("connect",
0311:                            new Class[] {
0312:                                    Class.forName("java.net.SocketAddress"),
0313:                                    int.class });
0314:                    connect.invoke(socket, new Object[] { address,
0315:                            new Integer(loginTimeout * 1000) });
0316:
0317:                    return socket;
0318:                } catch (InvocationTargetException ite) {
0319:                    // Reflection was OK but invocation of socket.bind() or socket.connect()
0320:                    // has failed. Try to report the underlying reason
0321:                    Throwable cause = ite.getTargetException();
0322:                    if (cause instanceof  IOException) {
0323:                        // OK was an IOException or subclass so just throw it
0324:                        throw (IOException) cause;
0325:                    }
0326:                    // Something else so return invocation exception anyway
0327:                    // (This should not normally occur)
0328:                    throw (IOException) Support.linkException(new IOException(
0329:                            "Could not create socket"), cause);
0330:                } catch (Exception e) {
0331:                    // Reflection has failed for some reason e.g. security so
0332:                    // try to create a socket in the old way.
0333:                    return new Socket(host, port);
0334:                }
0335:            }
0336:
0337:            /**
0338:             * Enable TLS encryption by creating a TLS socket over the
0339:             * existing TCP/IP network socket.
0340:             *
0341:             * @param ssl the SSL URL property value
0342:             * @throws IOException if an I/O error occurs
0343:             */
0344:            void enableEncryption(String ssl) throws IOException {
0345:                Logger.println("Enabling TLS encryption");
0346:                sslSocket = SocketFactories.getSocketFactory(ssl, socket)
0347:                        .createSocket(getHost(), getPort());
0348:                setOut(new DataOutputStream(sslSocket.getOutputStream()));
0349:                setIn(new DataInputStream(sslSocket.getInputStream()));
0350:            }
0351:
0352:            /**
0353:             * Disable TLS encryption and switch back to raw TCP/IP socket.
0354:             *
0355:             * @throws IOException if an I/O error occurs
0356:             */
0357:            void disableEncryption() throws IOException {
0358:                Logger.println("Disabling TLS encryption");
0359:                sslSocket.close();
0360:                sslSocket = null;
0361:                setOut(new DataOutputStream(socket.getOutputStream()));
0362:                setIn(new DataInputStream(socket.getInputStream()));
0363:            }
0364:
0365:            /**
0366:             * Set the character set descriptor to be used to translate byte arrays to
0367:             * or from Strings.
0368:             *
0369:             * @param charsetInfo the character set descriptor
0370:             */
0371:            void setCharsetInfo(CharsetInfo charsetInfo) {
0372:                this .charsetInfo = charsetInfo;
0373:            }
0374:
0375:            /**
0376:             * Retrieve the character set descriptor used to translate byte arrays to
0377:             * or from Strings.
0378:             */
0379:            CharsetInfo getCharsetInfo() {
0380:                return charsetInfo;
0381:            }
0382:
0383:            /**
0384:             * Retrieve the character set name used to translate byte arrays to
0385:             * or from Strings.
0386:             *
0387:             * @return the character set name as a <code>String</code>
0388:             */
0389:            String getCharset() {
0390:                return charsetInfo.getCharset();
0391:            }
0392:
0393:            /**
0394:             * Obtain an instance of a server request stream for this socket.
0395:             *
0396:             * @param bufferSize the initial buffer size to be used by the
0397:             *                   <code>RequestStream</code>
0398:             * @param maxPrecision the maximum precision for numeric/decimal types
0399:             * @return the server request stream as a <code>RequestStream</code>
0400:             */
0401:            RequestStream getRequestStream(int bufferSize, int maxPrecision) {
0402:                synchronized (socketTable) {
0403:                    int id;
0404:                    for (id = 0; id < socketTable.size(); id++) {
0405:                        if (socketTable.get(id) == null) {
0406:                            break;
0407:                        }
0408:                    }
0409:
0410:                    VirtualSocket vsock = new VirtualSocket(id);
0411:
0412:                    if (id >= socketTable.size()) {
0413:                        socketTable.add(vsock);
0414:                    } else {
0415:                        socketTable.set(id, vsock);
0416:                    }
0417:
0418:                    return new RequestStream(this , id, bufferSize, maxPrecision);
0419:                }
0420:            }
0421:
0422:            /**
0423:             * Obtain an instance of a server response stream for this socket.
0424:             * NB. getRequestStream() must be used first to obtain the RequestStream
0425:             * needed as a parameter for this method.
0426:             *
0427:             * @param requestStream an existing server request stream object obtained
0428:             *                      from this <code>SharedSocket</code>
0429:             * @param bufferSize    the initial buffer size to be used by the
0430:             *                      <code>RequestStream</code>
0431:             * @return the server response stream as a <code>ResponseStream</code>
0432:             */
0433:            ResponseStream getResponseStream(RequestStream requestStream,
0434:                    int bufferSize) {
0435:                return new ResponseStream(this , requestStream.getStreamId(),
0436:                        bufferSize);
0437:            }
0438:
0439:            /**
0440:             * Retrieve the TDS version that is active on the connection
0441:             * supported by this socket.
0442:             *
0443:             * @return the TDS version as an <code>int</code>
0444:             */
0445:            int getTdsVersion() {
0446:                return tdsVersion;
0447:            }
0448:
0449:            /**
0450:             * Set the TDS version field.
0451:             *
0452:             * @param tdsVersion the TDS version as an <code>int</code>
0453:             */
0454:            protected void setTdsVersion(int tdsVersion) {
0455:                this .tdsVersion = tdsVersion;
0456:            }
0457:
0458:            /**
0459:             * Set the global buffer memory limit for all instances of this driver.
0460:             *
0461:             * @param memoryBudget the global memory budget
0462:             */
0463:            static void setMemoryBudget(int memoryBudget) {
0464:                SharedSocket.memoryBudget = memoryBudget;
0465:            }
0466:
0467:            /**
0468:             * Get the global buffer memory limit for all instancs of this driver.
0469:             *
0470:             * @return the memory limit as an <code>int</code>
0471:             */
0472:            static int getMemoryBudget() {
0473:                return SharedSocket.memoryBudget;
0474:            }
0475:
0476:            /**
0477:             * Set the minimum number of packets to cache in memory before
0478:             * writing to disk.
0479:             *
0480:             * @param minMemPkts the minimum number of packets to cache
0481:             */
0482:            static void setMinMemPkts(int minMemPkts) {
0483:                SharedSocket.minMemPkts = minMemPkts;
0484:            }
0485:
0486:            /**
0487:             * Get the minimum number of memory cached packets.
0488:             *
0489:             * @return minimum memory packets as an <code>int</code>
0490:             */
0491:            static int getMinMemPkts() {
0492:                return SharedSocket.minMemPkts;
0493:            }
0494:
0495:            /**
0496:             * Get the connected status of this socket.
0497:             *
0498:             * @return <code>true</code> if the underlying socket is connected
0499:             */
0500:            boolean isConnected() {
0501:                return this .socket != null;
0502:            }
0503:
0504:            /**
0505:             * Send a TDS cancel packet to the server.
0506:             *
0507:             * @param streamId the <code>RequestStream</code> id
0508:             * @return <code>boolean</code> true if a cancel is actually
0509:             * issued by this method call.
0510:             */
0511:            boolean cancel(int streamId) {
0512:                //
0513:                // Need to synchronize packet send to avoid race conditions on
0514:                // responsOwner and cancelPending
0515:                //
0516:                synchronized (cancelMonitor) {
0517:                    //
0518:                    // Only send if response pending for the caller.
0519:                    // Caller must have aquired connection mutex first.
0520:                    // NB. This method will not work with local named pipes
0521:                    // as this thread will be blocked in the write until the
0522:                    // reading thread has returned from the read.
0523:                    //
0524:                    if (responseOwner == streamId && !cancelPending) {
0525:                        try {
0526:                            //
0527:                            // Send a cancel packet.
0528:                            //
0529:                            cancelPending = true;
0530:                            byte[] cancel = new byte[TDS_HDR_LEN];
0531:                            cancel[0] = TdsCore.CANCEL_PKT;
0532:                            cancel[1] = 1;
0533:                            cancel[2] = 0;
0534:                            cancel[3] = 8;
0535:                            cancel[4] = 0;
0536:                            cancel[5] = 0;
0537:                            cancel[6] = (tdsVersion >= Driver.TDS70) ? (byte) 1
0538:                                    : 0;
0539:                            cancel[7] = 0;
0540:                            getOut().write(cancel, 0, TDS_HDR_LEN);
0541:                            getOut().flush();
0542:                            if (Logger.isActive()) {
0543:                                Logger.logPacket(streamId, false, cancel);
0544:                            }
0545:                            return true;
0546:                        } catch (IOException e) {
0547:                            // Ignore error as network is probably dead anyway
0548:                        }
0549:                    }
0550:                }
0551:                return false;
0552:            }
0553:
0554:            /**
0555:             * Close the socket and release all resources.
0556:             *
0557:             * @throws IOException if the socket close fails
0558:             */
0559:            void close() throws IOException {
0560:                if (Logger.isActive()) {
0561:                    Logger.println("TdsSocket: Max buffer memory used = "
0562:                            + (peakMemUsage / 1024) + "KB");
0563:                }
0564:
0565:                synchronized (socketTable) {
0566:                    // See if any temporary files need deleting
0567:                    for (int i = 0; i < socketTable.size(); i++) {
0568:                        VirtualSocket vsock = (VirtualSocket) socketTable
0569:                                .get(i);
0570:
0571:                        if (vsock != null && vsock.diskQueue != null) {
0572:                            try {
0573:                                vsock.diskQueue.close();
0574:                                vsock.queueFile.delete();
0575:                            } catch (IOException ioe) {
0576:                                // Ignore errors
0577:                            }
0578:                        }
0579:                    }
0580:                    try {
0581:                        if (sslSocket != null) {
0582:                            sslSocket.close();
0583:                            sslSocket = null;
0584:                        }
0585:                    } finally {
0586:                        // Close physical socket
0587:                        if (socket != null) {
0588:                            socket.close();
0589:                        }
0590:                    }
0591:                }
0592:            }
0593:
0594:            /**
0595:             * Force close the socket causing any pending reads/writes to fail.
0596:             * <p>
0597:             * Used by the login timer to abort a login attempt.
0598:             */
0599:            void forceClose() {
0600:                if (socket != null) {
0601:                    try {
0602:                        socket.close();
0603:                    } catch (IOException ioe) {
0604:                        // Ignore
0605:                    } finally {
0606:                        sslSocket = null;
0607:                        socket = null;
0608:                    }
0609:                }
0610:            }
0611:
0612:            /**
0613:             * Deallocate a stream linked to this socket.
0614:             *
0615:             * @param streamId the <code>ResponseStream</code> id
0616:             */
0617:            void closeStream(int streamId) {
0618:                synchronized (socketTable) {
0619:                    VirtualSocket vsock = lookup(streamId);
0620:
0621:                    if (vsock.diskQueue != null) {
0622:                        try {
0623:                            vsock.diskQueue.close();
0624:                            vsock.queueFile.delete();
0625:                        } catch (IOException ioe) {
0626:                            // Ignore errors
0627:                        }
0628:                    }
0629:
0630:                    socketTable.set(streamId, null);
0631:                }
0632:            }
0633:
0634:            /**
0635:             * Send a network packet. If output for another virtual socket is
0636:             * in progress this packet will be sent later.
0637:             *
0638:             * @param streamId the originating <code>RequestStream</code> object
0639:             * @param buffer   the data to send
0640:             * @return the same buffer received if emptied or another buffer w/ the
0641:             *         same size if the incoming buffer is cached (to avoid copying)
0642:             * @throws IOException if an I/O error occurs
0643:             */
0644:            byte[] sendNetPacket(int streamId, byte buffer[])
0645:                    throws IOException {
0646:                synchronized (socketTable) {
0647:
0648:                    VirtualSocket vsock = lookup(streamId);
0649:
0650:                    while (vsock.inputPkts > 0) {
0651:                        //
0652:                        // There is unread data in the input buffers.
0653:                        // As we are sending another packet we can just discard it now.
0654:                        //
0655:                        if (Logger.isActive()) {
0656:                            Logger
0657:                                    .println("TdsSocket: Unread data in input packet queue");
0658:                        }
0659:                        dequeueInput(vsock);
0660:                    }
0661:
0662:                    if (responseOwner != -1) {
0663:                        //
0664:                        // Complex case there is another stream's data in the network pipe
0665:                        // or we had our own incomplete request to discard first
0666:                        // Read and store other stream's data or flush our own.
0667:                        //
0668:                        VirtualSocket other = (VirtualSocket) socketTable
0669:                                .get(responseOwner);
0670:                        byte[] tmpBuf = null;
0671:                        boolean ourData = (other.owner == streamId);
0672:                        do {
0673:                            // Reuse the buffer if it's our data; we don't need it
0674:                            tmpBuf = readPacket(ourData ? tmpBuf : null);
0675:
0676:                            if (!ourData) {
0677:                                // We need to save this input as it belongs to
0678:                                // Another thread.
0679:                                enqueueInput(other, tmpBuf);
0680:                            } // Any of our input is discarded.
0681:
0682:                        } while (tmpBuf[1] == 0); // Read all data to complete TDS packet
0683:                    }
0684:                    //
0685:                    // At this point we know that we are able to send the first
0686:                    // or subsequent packet of a new request.
0687:                    //
0688:                    getOut().write(buffer, 0, getPktLen(buffer));
0689:
0690:                    if (buffer[1] != 0) {
0691:                        getOut().flush();
0692:                        // We are the response owner now
0693:                        responseOwner = streamId;
0694:                    }
0695:
0696:                    return buffer;
0697:                }
0698:            }
0699:
0700:            /**
0701:             * Get a network packet. This may be read from the network directly or from
0702:             * previously cached buffers.
0703:             *
0704:             * @param streamId the originating ResponseStream object
0705:             * @param buffer   the data buffer to receive the object (may be replaced)
0706:             * @return the data in a <code>byte[]</code> buffer
0707:             * @throws IOException if an I/O error occurs
0708:             */
0709:            byte[] getNetPacket(int streamId, byte buffer[]) throws IOException {
0710:                synchronized (socketTable) {
0711:                    VirtualSocket vsock = lookup(streamId);
0712:
0713:                    //
0714:                    // Return any cached input
0715:                    //
0716:                    if (vsock.inputPkts > 0) {
0717:                        return dequeueInput(vsock);
0718:                    }
0719:
0720:                    //
0721:                    // Nothing cached see if we are expecting network data
0722:                    //
0723:                    if (responseOwner == -1) {
0724:                        throw new IOException(
0725:                                "Stream "
0726:                                        + streamId
0727:                                        + " attempting to read when no request has been sent");
0728:                    }
0729:                    //
0730:                    // OK There should be data, check that it is for this stream
0731:                    //
0732:                    if (responseOwner != streamId) {
0733:                        // Error we are trying to read another thread's request.
0734:                        throw new IOException(
0735:                                "Stream "
0736:                                        + streamId
0737:                                        + " is trying to read data that belongs to stream "
0738:                                        + responseOwner);
0739:                    }
0740:                    //
0741:                    // Simple case we are reading our input directly from the server
0742:                    //
0743:                    return readPacket(buffer);
0744:                }
0745:            }
0746:
0747:            /**
0748:             * Save a packet buffer in a memory queue or to a disk queue if the global
0749:             * memory limit for the driver has been exceeded.
0750:             *
0751:             * @param vsock  the virtual socket owning this data
0752:             * @param buffer the data to queue
0753:             */
0754:            private void enqueueInput(VirtualSocket vsock, byte[] buffer)
0755:                    throws IOException {
0756:                //
0757:                // Check to see if we should start caching to disk
0758:                //
0759:                if (globalMemUsage + buffer.length > memoryBudget
0760:                        && vsock.pktQueue.size() >= minMemPkts
0761:                        && !securityViolation && vsock.diskQueue == null) {
0762:                    // Try to create a disk file for the queue
0763:                    try {
0764:                        vsock.queueFile = File.createTempFile("jtds", ".tmp",
0765:                                bufferDir);
0766:                        vsock.queueFile.deleteOnExit();
0767:                        vsock.diskQueue = new RandomAccessFile(vsock.queueFile,
0768:                                "rw");
0769:
0770:                        // Write current cache contents to disk and free memory
0771:                        byte[] tmpBuf;
0772:
0773:                        while (vsock.pktQueue.size() > 0) {
0774:                            tmpBuf = (byte[]) vsock.pktQueue.removeFirst();
0775:                            vsock.diskQueue.write(tmpBuf, 0, getPktLen(tmpBuf));
0776:                            vsock.pktsOnDisk++;
0777:                        }
0778:                    } catch (java.lang.SecurityException se) {
0779:                        // Not allowed to cache to disk so carry on in memory
0780:                        securityViolation = true;
0781:                        vsock.queueFile = null;
0782:                        vsock.diskQueue = null;
0783:                    }
0784:                }
0785:
0786:                if (vsock.diskQueue != null) {
0787:                    // Cache file exists so append buffer to it
0788:                    vsock.diskQueue.write(buffer, 0, getPktLen(buffer));
0789:                    vsock.pktsOnDisk++;
0790:                } else {
0791:                    // Will cache in memory
0792:                    vsock.pktQueue.addLast(buffer);
0793:                    globalMemUsage += buffer.length;
0794:
0795:                    if (globalMemUsage > peakMemUsage) {
0796:                        peakMemUsage = globalMemUsage;
0797:                    }
0798:                }
0799:
0800:                vsock.inputPkts++;
0801:            }
0802:
0803:            /**
0804:             * Read a cached packet from the in memory queue or from a disk based queue.
0805:             *
0806:             * @param vsock the virtual socket owning this data
0807:             * @return a buffer containing the packet
0808:             */
0809:            private byte[] dequeueInput(VirtualSocket vsock) throws IOException {
0810:                byte[] buffer = null;
0811:
0812:                if (vsock.pktsOnDisk > 0) {
0813:                    // Data is cached on disk
0814:                    if (vsock.diskQueue.getFilePointer() == vsock.diskQueue
0815:                            .length()) {
0816:                        // First read so rewind() file
0817:                        vsock.diskQueue.seek(0L);
0818:                    }
0819:
0820:                    vsock.diskQueue.readFully(hdrBuf, 0, TDS_HDR_LEN);
0821:
0822:                    int len = getPktLen(hdrBuf);
0823:
0824:                    buffer = new byte[len];
0825:                    System.arraycopy(hdrBuf, 0, buffer, 0, TDS_HDR_LEN);
0826:                    vsock.diskQueue.readFully(buffer, TDS_HDR_LEN, len
0827:                            - TDS_HDR_LEN);
0828:                    vsock.pktsOnDisk--;
0829:
0830:                    if (vsock.pktsOnDisk < 1) {
0831:                        // File now empty so close and delete it
0832:                        try {
0833:                            vsock.diskQueue.close();
0834:                            vsock.queueFile.delete();
0835:                        } finally {
0836:                            vsock.queueFile = null;
0837:                            vsock.diskQueue = null;
0838:                        }
0839:                    }
0840:                } else if (vsock.pktQueue.size() > 0) {
0841:                    buffer = (byte[]) vsock.pktQueue.removeFirst();
0842:                    globalMemUsage -= buffer.length;
0843:                }
0844:
0845:                if (buffer != null) {
0846:                    vsock.inputPkts--;
0847:                }
0848:
0849:                return buffer;
0850:            }
0851:
0852:            /**
0853:             * Read a physical TDS packet from the network.
0854:             *
0855:             * @param buffer a buffer to read the data into (if it fits) or null
0856:             * @return either the incoming buffer if it was large enough or a newly
0857:             *         allocated buffer with the read packet
0858:             */
0859:            private byte[] readPacket(byte buffer[]) throws IOException {
0860:                //
0861:                // Read rest of header
0862:                try {
0863:                    getIn().readFully(hdrBuf);
0864:                } catch (EOFException e) {
0865:                    throw new IOException("DB server closed connection.");
0866:                }
0867:
0868:                byte packetType = hdrBuf[0];
0869:
0870:                if (packetType != TdsCore.LOGIN_PKT
0871:                        && packetType != TdsCore.QUERY_PKT
0872:                        && packetType != TdsCore.REPLY_PKT) {
0873:                    throw new IOException("Unknown packet type 0x"
0874:                            + Integer.toHexString(packetType & 0xFF));
0875:                }
0876:
0877:                // figure out how many bytes are remaining in this packet.
0878:                int len = getPktLen(hdrBuf);
0879:
0880:                if (len < TDS_HDR_LEN || len > 65536) {
0881:                    throw new IOException("Invalid network packet length "
0882:                            + len);
0883:                }
0884:
0885:                if (buffer == null || len > buffer.length) {
0886:                    // Create or expand the buffer as required
0887:                    buffer = new byte[len];
0888:
0889:                    if (len > maxBufSize) {
0890:                        maxBufSize = len;
0891:                    }
0892:                }
0893:
0894:                // Preserve the packet header in the buffer
0895:                System.arraycopy(hdrBuf, 0, buffer, 0, TDS_HDR_LEN);
0896:
0897:                try {
0898:                    getIn().readFully(buffer, TDS_HDR_LEN, len - TDS_HDR_LEN);
0899:                } catch (EOFException e) {
0900:                    throw new IOException("DB server closed connection.");
0901:                }
0902:
0903:                //
0904:                // SQL Server 2000 < SP3 does not set the last packet
0905:                // flag in the NT challenge packet.
0906:                // If this is the first packet and the length is correct
0907:                // force the last packet flag on.
0908:                //
0909:                if (++packetCount == 1 && serverType == Driver.SQLSERVER
0910:                        && "NTLMSSP".equals(new String(buffer, 11, 7))) {
0911:                    buffer[1] = 1;
0912:                }
0913:
0914:                synchronized (cancelMonitor) {
0915:                    //
0916:                    // If a cancel request is outstanding check that the last TDS packet
0917:                    // is a TDS_DONE with the "cancek ACK" flag set. If it isn't set the
0918:                    // "more packets" flag; this will ensure that the stream keeps
0919:                    // processing until the "cancel ACK" is processed.
0920:                    //
0921:                    if (cancelPending) {
0922:                        //
0923:                        // Move what we assume to be the TDS_DONE packet into doneBuffer
0924:                        //
0925:                        if (len >= TDS_DONE_LEN + TDS_HDR_LEN) {
0926:                            System.arraycopy(buffer, len - TDS_DONE_LEN,
0927:                                    doneBuffer, 0, TDS_DONE_LEN);
0928:                        } else {
0929:                            // Packet too short so TDS_DONE record was split over
0930:                            // two packets. Need to reassemble.
0931:                            int frag = len - TDS_HDR_LEN;
0932:                            System.arraycopy(doneBuffer, frag, doneBuffer, 0,
0933:                                    TDS_DONE_LEN - frag);
0934:                            System.arraycopy(buffer, TDS_HDR_LEN, doneBuffer,
0935:                                    TDS_DONE_LEN - frag, frag);
0936:                        }
0937:                        //
0938:                        // If this is the last packet and there is a cancel pending see
0939:                        // if the last packet contains a TDS_DONE token with the cancel
0940:                        // ACK set. If not reset the last packet flag so that the dedicated
0941:                        // cancel packet is also read and processed.
0942:                        //
0943:                        if (buffer[1] == 1) {
0944:                            if ((doneBuffer[0] & 0xFF) < TDS_DONE_TOKEN) {
0945:                                throw new IOException(
0946:                                        "Expecting a TDS_DONE or TDS_DONEPROC.");
0947:                            }
0948:
0949:                            if ((doneBuffer[1] & TdsCore.DONE_CANCEL) != 0) {
0950:                                // OK have a cancel ACK packet
0951:                                cancelPending = false;
0952:                            } else {
0953:                                // Must be in next packet so
0954:                                // force client to read next packet
0955:                                buffer[1] = 0;
0956:                            }
0957:                        }
0958:                    }
0959:
0960:                    if (buffer[1] != 0) {
0961:                        // End of response; connection now free
0962:                        responseOwner = -1;
0963:                    }
0964:                }
0965:
0966:                return buffer;
0967:            }
0968:
0969:            /**
0970:             * Retrieves the virtual socket with the given id.
0971:             *
0972:             * @param streamId id of the virtual socket to retrieve
0973:             */
0974:            private VirtualSocket lookup(int streamId) {
0975:                if (streamId < 0 || streamId > socketTable.size()) {
0976:                    throw new IllegalArgumentException(
0977:                            "Invalid parameter stream ID " + streamId);
0978:                }
0979:
0980:                VirtualSocket vsock = (VirtualSocket) socketTable.get(streamId);
0981:
0982:                if (vsock.owner != streamId) {
0983:                    throw new IllegalStateException(
0984:                            "Internal error: bad stream ID " + streamId);
0985:                }
0986:
0987:                return vsock;
0988:            }
0989:
0990:            /**
0991:             * Convert two bytes (in network byte order) in a byte array into a Java
0992:             * short integer.
0993:             *
0994:             * @param buf    array of data
0995:             * @return the 16 bit unsigned value as an <code>int</code>
0996:             */
0997:            static int getPktLen(byte buf[]) {
0998:                int lo = ((int) buf[3] & 0xff);
0999:                int hi = (((int) buf[2] & 0xff) << 8);
1000:
1001:                return hi | lo;
1002:            }
1003:
1004:            /**
1005:             * Set the socket timeout.
1006:             *
1007:             * @param timeout the timeout value in milliseconds
1008:             */
1009:            protected void setTimeout(int timeout) throws SocketException {
1010:                socket.setSoTimeout(timeout);
1011:            }
1012:
1013:            /**
1014:             * Getter for {@link SharedSocket#in} field.
1015:             *
1016:             * @return {@link InputStream} used for communication
1017:             */
1018:            protected DataInputStream getIn() {
1019:                return in;
1020:            }
1021:
1022:            /**
1023:             * Setter for {@link SharedSocket#in} field.
1024:             *
1025:             * @param in the {@link InputStream} to be used for communication
1026:             */
1027:            protected void setIn(DataInputStream in) {
1028:                this .in = in;
1029:            }
1030:
1031:            /**
1032:             * Getter for {@link SharedSocket#out} field.
1033:             *
1034:             * @return {@link OutputStream} used for communication
1035:             */
1036:            protected DataOutputStream getOut() {
1037:                return out;
1038:            }
1039:
1040:            /**
1041:             * Setter for {@link SharedSocket#out} field.
1042:             *
1043:             * @param out the {@link OutputStream} to be used for communication
1044:             */
1045:            protected void setOut(DataOutputStream out) {
1046:                this .out = out;
1047:            }
1048:
1049:            /**
1050:             * Get the server host name.
1051:             *
1052:             * @return the host name as a <code>String</code>
1053:             */
1054:            protected String getHost() {
1055:                return this .host;
1056:            }
1057:
1058:            /**
1059:             * Get the server port number.
1060:             *
1061:             * @return the host port as an <code>int</code>
1062:             */
1063:            protected int getPort() {
1064:                return this.port;
1065:            }
1066:        }
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.