0001: // jTDS JDBC Driver for Microsoft SQL Server and Sybase
0002: // Copyright (C) 2004 The jTDS Project
0003: //
0004: // This library is free software; you can redistribute it and/or
0005: // modify it under the terms of the GNU Lesser General Public
0006: // License as published by the Free Software Foundation; either
0007: // version 2.1 of the License, or (at your option) any later version.
0008: //
0009: // This library is distributed in the hope that it will be useful,
0010: // but WITHOUT ANY WARRANTY; without even the implied warranty of
0011: // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
0012: // Lesser General Public License for more details.
0013: //
0014: // You should have received a copy of the GNU Lesser General Public
0015: // License along with this library; if not, write to the Free Software
0016: // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
0017: //
0018: package net.sourceforge.jtds.jdbc;
0019:
0020: import java.io.DataInputStream;
0021: import java.io.DataOutputStream;
0022: import java.io.EOFException;
0023: import java.io.File;
0024: import java.io.IOException;
0025: import java.io.InputStream;
0026: import java.io.OutputStream;
0027: import java.io.RandomAccessFile;
0028: import java.lang.reflect.Constructor;
0029: import java.lang.reflect.InvocationTargetException;
0030: import java.lang.reflect.Method;
0031: import java.net.Socket;
0032: import java.net.SocketException;
0033: import java.net.UnknownHostException;
0034: import java.util.ArrayList;
0035: import java.util.LinkedList;
0036:
0037: import net.sourceforge.jtds.ssl.SocketFactories;
0038: import net.sourceforge.jtds.util.Logger;
0039:
0040: /**
0041: * This class mananges the physical connection to the SQL Server and
0042: * serialises its use amongst a number of virtual sockets.
0043: * This allows one physical connection to service a number of concurrent
0044: * statements.
0045: * <p>
0046: * Constraints and assumptions:
0047: * <ol>
0048: * <li>Callers will not attempt to read from the server without issuing a request first.
0049: * <li>The end of a server reply can be identified as byte 2 of the header is non zero.
0050: * </ol>
0051: * </p>
0052: * Comments:
0053: * <ol>
0054: * <li>This code will discard unread server data if a new request is issued.
0055: * Currently the higher levels of the driver attempt to do this but may be
0056: * we can just rely on this code instead.
0057: * <li>A cancel can be issued by a caller only if the server is currently sending
0058: * data for the caller otherwise the cancel is ignored.
0059: * <li>Cancel packets on their own are returned as extra records appended to the
0060: * previous packet so that the TdsCore module can process them.
0061: * </ol>
0062: * This version of the class will start to cache results to disk once a predetermined
0063: * maximum buffer memory threshold has been passed. Small result sets that will fit
0064: * within a specified limit (default 8 packets) will continue to be held in memory
0065: * (even if the memory threshold has been passed) in the interests of efficiency.
0066: *
0067: * @author Mike Hutchinson.
0068: * @version $Id: SharedSocket.java,v 1.39 2007/07/08 21:38:13 bheineman Exp $
0069: */
0070: class SharedSocket {
0071: /**
0072: * This inner class contains the state information for the virtual socket.
0073: */
0074: private static class VirtualSocket {
0075: /**
0076: * The stream ID of the stream objects owning this state.
0077: */
0078: final int owner;
0079: /**
0080: * Memory resident packet queue.
0081: */
0082: final LinkedList pktQueue;
0083: /**
0084: * True to discard network data.
0085: */
0086: boolean flushInput;
0087: /**
0088: * True if output is complete TDS packet.
0089: */
0090: boolean complete;
0091: /**
0092: * File object for disk packet queue.
0093: */
0094: File queueFile;
0095: /**
0096: * I/O Stream for disk packet queue.
0097: */
0098: RandomAccessFile diskQueue;
0099: /**
0100: * Number of packets cached to disk.
0101: */
0102: int pktsOnDisk;
0103: /**
0104: * Total of input packets in memory or disk.
0105: */
0106: int inputPkts;
0107:
0108: /**
0109: * Constuct object to hold state information for each caller.
0110: * @param streamId the Response/Request stream id.
0111: */
0112: VirtualSocket(int streamId) {
0113: this .owner = streamId;
0114: this .pktQueue = new LinkedList();
0115: this .flushInput = false;
0116: this .complete = false;
0117: this .queueFile = null;
0118: this .diskQueue = null;
0119: this .pktsOnDisk = 0;
0120: this .inputPkts = 0;
0121: }
0122: }
0123:
0124: /**
0125: * The shared network socket.
0126: */
0127: private Socket socket;
0128: /**
0129: * The shared SSL network socket;
0130: */
0131: private Socket sslSocket;
0132: /**
0133: * Output stream for network socket.
0134: */
0135: private DataOutputStream out;
0136: /**
0137: * Input stream for network socket.
0138: */
0139: private DataInputStream in;
0140: /**
0141: * Current maxium input buffer size.
0142: */
0143: private int maxBufSize = TdsCore.MIN_PKT_SIZE;
0144: /**
0145: * Table of stream objects sharing this socket.
0146: */
0147: private final ArrayList socketTable = new ArrayList();
0148: /**
0149: * The Stream ID of the object that is expecting a response from the server.
0150: */
0151: private int responseOwner = -1;
0152: /**
0153: * Buffer for packet header.
0154: */
0155: private final byte hdrBuf[] = new byte[TDS_HDR_LEN];
0156: /**
0157: * The directory to buffer data to.
0158: */
0159: private final File bufferDir;
0160: /**
0161: * Total memory usage in all instances of the driver
0162: * NB. Access to this field should probably be synchronized
0163: * but in practice lost updates will not matter much and I think
0164: * all VMs tend to do atomic saves to integer variables.
0165: */
0166: private static int globalMemUsage;
0167: /**
0168: * Peak memory usage for debug purposes.
0169: */
0170: private static int peakMemUsage;
0171: /**
0172: * Max memory limit to use for buffers.
0173: * Only when this limit is exceeded will the driver
0174: * start caching to disk.
0175: */
0176: private static int memoryBudget = 100000; // 100K
0177: /**
0178: * Minimum number of packets that will be cached in memory
0179: * before the driver tries to write to disk even if
0180: * memoryBudget has been exceeded.
0181: */
0182: private static int minMemPkts = 8;
0183: /**
0184: * Global flag to indicate that security constraints mean
0185: * that attempts to create work files will fail.
0186: */
0187: private static boolean securityViolation;
0188: /**
0189: * Tds protocol version
0190: */
0191: private int tdsVersion;
0192: /**
0193: * The servertype one of Driver.SQLSERVER or Driver.SYBASE
0194: */
0195: protected final int serverType;
0196: /**
0197: * The character set to use for converting strings to/from bytes.
0198: */
0199: private CharsetInfo charsetInfo;
0200: /**
0201: * Count of packets received.
0202: */
0203: private int packetCount;
0204: /**
0205: * The server host name.
0206: */
0207: private String host;
0208: /**
0209: * The server port number.
0210: */
0211: private int port;
0212: /**
0213: * A cancel packet is pending.
0214: */
0215: private boolean cancelPending;
0216: /**
0217: * Synchronization monitor for {@link #cancelPending} and
0218: * {@link #responseOwner}.
0219: */
0220: private Object cancelMonitor = new Object();
0221: /**
0222: * Buffer for TDS_DONE packets
0223: */
0224: private byte doneBuffer[] = new byte[TDS_DONE_LEN];
0225: /**
0226: * TDS done token.
0227: */
0228: private static final int TDS_DONE_TOKEN = 253;
0229: /**
0230: * Length of a TDS_DONE token.
0231: */
0232: private static final int TDS_DONE_LEN = 9;
0233: /**
0234: * Length of TDS packet header.
0235: */
0236: private static final int TDS_HDR_LEN = 8;
0237:
0238: protected SharedSocket(File bufferDir, int tdsVersion,
0239: int serverType) {
0240: this .bufferDir = bufferDir;
0241: this .tdsVersion = tdsVersion;
0242: this .serverType = serverType;
0243: }
0244:
0245: /**
0246: * Construct a <code>SharedSocket</code> object specifying host name and
0247: * port.
0248: *
0249: * @param connection the connection object
0250: * @throws IOException if socket open fails
0251: */
0252: SharedSocket(ConnectionJDBC2 connection) throws IOException,
0253: UnknownHostException {
0254: this (connection.getBufferDir(), connection.getTdsVersion(),
0255: connection.getServerType());
0256: this .host = connection.getServerName();
0257: this .port = connection.getPortNumber();
0258: if (Driver.JDBC3) {
0259: this .socket = createSocketForJDBC3(connection);
0260: } else {
0261: this .socket = new Socket(this .host, this .port);
0262: }
0263: setOut(new DataOutputStream(socket.getOutputStream()));
0264: setIn(new DataInputStream(socket.getInputStream()));
0265: this .socket.setTcpNoDelay(connection.getTcpNoDelay());
0266: this .socket.setSoTimeout(connection.getSocketTimeout() * 1000);
0267: }
0268:
0269: /**
0270: * Creates a {@link Socket} through reflection when {@link Driver#JDBC3}
0271: * is <code>true</code>. Reflection must be used to stay compatible
0272: * with JDK 1.3.
0273: *
0274: * @param connection the connection object
0275: * @return a socket open to the host and port with the given timeout
0276: * @throws IOException if socket open fails
0277: */
0278: private Socket createSocketForJDBC3(ConnectionJDBC2 connection)
0279: throws IOException {
0280: final String host = connection.getServerName();
0281: final int port = connection.getPortNumber();
0282: final int loginTimeout = connection.getLoginTimeout();
0283: final String bindAddress = connection.getBindAddress();
0284: try {
0285: // Create the Socket
0286: Constructor socketConstructor = Socket.class
0287: .getConstructor(new Class[] {});
0288: Socket socket = (Socket) socketConstructor
0289: .newInstance(new Object[] {});
0290:
0291: // Create the InetSocketAddress
0292: Constructor constructor = Class.forName(
0293: "java.net.InetSocketAddress").getConstructor(
0294: new Class[] { String.class, int.class });
0295: Object address = constructor.newInstance(new Object[] {
0296: host, new Integer(port) });
0297:
0298: // Call Socket.bind(SocketAddress) if bindAddress parameter is set
0299: if (bindAddress != null && bindAddress.length() > 0) {
0300: Object localBindAddress = constructor
0301: .newInstance(new Object[] { bindAddress,
0302: new Integer(0) });
0303: Method bind = Socket.class.getMethod("bind",
0304: new Class[] { Class
0305: .forName("java.net.SocketAddress") });
0306: bind.invoke(socket, new Object[] { localBindAddress });
0307: }
0308:
0309: // Call Socket.connect(InetSocketAddress, int)
0310: Method connect = Socket.class.getMethod("connect",
0311: new Class[] {
0312: Class.forName("java.net.SocketAddress"),
0313: int.class });
0314: connect.invoke(socket, new Object[] { address,
0315: new Integer(loginTimeout * 1000) });
0316:
0317: return socket;
0318: } catch (InvocationTargetException ite) {
0319: // Reflection was OK but invocation of socket.bind() or socket.connect()
0320: // has failed. Try to report the underlying reason
0321: Throwable cause = ite.getTargetException();
0322: if (cause instanceof IOException) {
0323: // OK was an IOException or subclass so just throw it
0324: throw (IOException) cause;
0325: }
0326: // Something else so return invocation exception anyway
0327: // (This should not normally occur)
0328: throw (IOException) Support.linkException(new IOException(
0329: "Could not create socket"), cause);
0330: } catch (Exception e) {
0331: // Reflection has failed for some reason e.g. security so
0332: // try to create a socket in the old way.
0333: return new Socket(host, port);
0334: }
0335: }
0336:
0337: /**
0338: * Enable TLS encryption by creating a TLS socket over the
0339: * existing TCP/IP network socket.
0340: *
0341: * @param ssl the SSL URL property value
0342: * @throws IOException if an I/O error occurs
0343: */
0344: void enableEncryption(String ssl) throws IOException {
0345: Logger.println("Enabling TLS encryption");
0346: sslSocket = SocketFactories.getSocketFactory(ssl, socket)
0347: .createSocket(getHost(), getPort());
0348: setOut(new DataOutputStream(sslSocket.getOutputStream()));
0349: setIn(new DataInputStream(sslSocket.getInputStream()));
0350: }
0351:
0352: /**
0353: * Disable TLS encryption and switch back to raw TCP/IP socket.
0354: *
0355: * @throws IOException if an I/O error occurs
0356: */
0357: void disableEncryption() throws IOException {
0358: Logger.println("Disabling TLS encryption");
0359: sslSocket.close();
0360: sslSocket = null;
0361: setOut(new DataOutputStream(socket.getOutputStream()));
0362: setIn(new DataInputStream(socket.getInputStream()));
0363: }
0364:
0365: /**
0366: * Set the character set descriptor to be used to translate byte arrays to
0367: * or from Strings.
0368: *
0369: * @param charsetInfo the character set descriptor
0370: */
0371: void setCharsetInfo(CharsetInfo charsetInfo) {
0372: this .charsetInfo = charsetInfo;
0373: }
0374:
0375: /**
0376: * Retrieve the character set descriptor used to translate byte arrays to
0377: * or from Strings.
0378: */
0379: CharsetInfo getCharsetInfo() {
0380: return charsetInfo;
0381: }
0382:
0383: /**
0384: * Retrieve the character set name used to translate byte arrays to
0385: * or from Strings.
0386: *
0387: * @return the character set name as a <code>String</code>
0388: */
0389: String getCharset() {
0390: return charsetInfo.getCharset();
0391: }
0392:
0393: /**
0394: * Obtain an instance of a server request stream for this socket.
0395: *
0396: * @param bufferSize the initial buffer size to be used by the
0397: * <code>RequestStream</code>
0398: * @param maxPrecision the maximum precision for numeric/decimal types
0399: * @return the server request stream as a <code>RequestStream</code>
0400: */
0401: RequestStream getRequestStream(int bufferSize, int maxPrecision) {
0402: synchronized (socketTable) {
0403: int id;
0404: for (id = 0; id < socketTable.size(); id++) {
0405: if (socketTable.get(id) == null) {
0406: break;
0407: }
0408: }
0409:
0410: VirtualSocket vsock = new VirtualSocket(id);
0411:
0412: if (id >= socketTable.size()) {
0413: socketTable.add(vsock);
0414: } else {
0415: socketTable.set(id, vsock);
0416: }
0417:
0418: return new RequestStream(this , id, bufferSize, maxPrecision);
0419: }
0420: }
0421:
0422: /**
0423: * Obtain an instance of a server response stream for this socket.
0424: * NB. getRequestStream() must be used first to obtain the RequestStream
0425: * needed as a parameter for this method.
0426: *
0427: * @param requestStream an existing server request stream object obtained
0428: * from this <code>SharedSocket</code>
0429: * @param bufferSize the initial buffer size to be used by the
0430: * <code>RequestStream</code>
0431: * @return the server response stream as a <code>ResponseStream</code>
0432: */
0433: ResponseStream getResponseStream(RequestStream requestStream,
0434: int bufferSize) {
0435: return new ResponseStream(this , requestStream.getStreamId(),
0436: bufferSize);
0437: }
0438:
0439: /**
0440: * Retrieve the TDS version that is active on the connection
0441: * supported by this socket.
0442: *
0443: * @return the TDS version as an <code>int</code>
0444: */
0445: int getTdsVersion() {
0446: return tdsVersion;
0447: }
0448:
0449: /**
0450: * Set the TDS version field.
0451: *
0452: * @param tdsVersion the TDS version as an <code>int</code>
0453: */
0454: protected void setTdsVersion(int tdsVersion) {
0455: this .tdsVersion = tdsVersion;
0456: }
0457:
0458: /**
0459: * Set the global buffer memory limit for all instances of this driver.
0460: *
0461: * @param memoryBudget the global memory budget
0462: */
0463: static void setMemoryBudget(int memoryBudget) {
0464: SharedSocket.memoryBudget = memoryBudget;
0465: }
0466:
0467: /**
0468: * Get the global buffer memory limit for all instancs of this driver.
0469: *
0470: * @return the memory limit as an <code>int</code>
0471: */
0472: static int getMemoryBudget() {
0473: return SharedSocket.memoryBudget;
0474: }
0475:
0476: /**
0477: * Set the minimum number of packets to cache in memory before
0478: * writing to disk.
0479: *
0480: * @param minMemPkts the minimum number of packets to cache
0481: */
0482: static void setMinMemPkts(int minMemPkts) {
0483: SharedSocket.minMemPkts = minMemPkts;
0484: }
0485:
0486: /**
0487: * Get the minimum number of memory cached packets.
0488: *
0489: * @return minimum memory packets as an <code>int</code>
0490: */
0491: static int getMinMemPkts() {
0492: return SharedSocket.minMemPkts;
0493: }
0494:
0495: /**
0496: * Get the connected status of this socket.
0497: *
0498: * @return <code>true</code> if the underlying socket is connected
0499: */
0500: boolean isConnected() {
0501: return this .socket != null;
0502: }
0503:
0504: /**
0505: * Send a TDS cancel packet to the server.
0506: *
0507: * @param streamId the <code>RequestStream</code> id
0508: * @return <code>boolean</code> true if a cancel is actually
0509: * issued by this method call.
0510: */
0511: boolean cancel(int streamId) {
0512: //
0513: // Need to synchronize packet send to avoid race conditions on
0514: // responsOwner and cancelPending
0515: //
0516: synchronized (cancelMonitor) {
0517: //
0518: // Only send if response pending for the caller.
0519: // Caller must have aquired connection mutex first.
0520: // NB. This method will not work with local named pipes
0521: // as this thread will be blocked in the write until the
0522: // reading thread has returned from the read.
0523: //
0524: if (responseOwner == streamId && !cancelPending) {
0525: try {
0526: //
0527: // Send a cancel packet.
0528: //
0529: cancelPending = true;
0530: byte[] cancel = new byte[TDS_HDR_LEN];
0531: cancel[0] = TdsCore.CANCEL_PKT;
0532: cancel[1] = 1;
0533: cancel[2] = 0;
0534: cancel[3] = 8;
0535: cancel[4] = 0;
0536: cancel[5] = 0;
0537: cancel[6] = (tdsVersion >= Driver.TDS70) ? (byte) 1
0538: : 0;
0539: cancel[7] = 0;
0540: getOut().write(cancel, 0, TDS_HDR_LEN);
0541: getOut().flush();
0542: if (Logger.isActive()) {
0543: Logger.logPacket(streamId, false, cancel);
0544: }
0545: return true;
0546: } catch (IOException e) {
0547: // Ignore error as network is probably dead anyway
0548: }
0549: }
0550: }
0551: return false;
0552: }
0553:
0554: /**
0555: * Close the socket and release all resources.
0556: *
0557: * @throws IOException if the socket close fails
0558: */
0559: void close() throws IOException {
0560: if (Logger.isActive()) {
0561: Logger.println("TdsSocket: Max buffer memory used = "
0562: + (peakMemUsage / 1024) + "KB");
0563: }
0564:
0565: synchronized (socketTable) {
0566: // See if any temporary files need deleting
0567: for (int i = 0; i < socketTable.size(); i++) {
0568: VirtualSocket vsock = (VirtualSocket) socketTable
0569: .get(i);
0570:
0571: if (vsock != null && vsock.diskQueue != null) {
0572: try {
0573: vsock.diskQueue.close();
0574: vsock.queueFile.delete();
0575: } catch (IOException ioe) {
0576: // Ignore errors
0577: }
0578: }
0579: }
0580: try {
0581: if (sslSocket != null) {
0582: sslSocket.close();
0583: sslSocket = null;
0584: }
0585: } finally {
0586: // Close physical socket
0587: if (socket != null) {
0588: socket.close();
0589: }
0590: }
0591: }
0592: }
0593:
0594: /**
0595: * Force close the socket causing any pending reads/writes to fail.
0596: * <p>
0597: * Used by the login timer to abort a login attempt.
0598: */
0599: void forceClose() {
0600: if (socket != null) {
0601: try {
0602: socket.close();
0603: } catch (IOException ioe) {
0604: // Ignore
0605: } finally {
0606: sslSocket = null;
0607: socket = null;
0608: }
0609: }
0610: }
0611:
0612: /**
0613: * Deallocate a stream linked to this socket.
0614: *
0615: * @param streamId the <code>ResponseStream</code> id
0616: */
0617: void closeStream(int streamId) {
0618: synchronized (socketTable) {
0619: VirtualSocket vsock = lookup(streamId);
0620:
0621: if (vsock.diskQueue != null) {
0622: try {
0623: vsock.diskQueue.close();
0624: vsock.queueFile.delete();
0625: } catch (IOException ioe) {
0626: // Ignore errors
0627: }
0628: }
0629:
0630: socketTable.set(streamId, null);
0631: }
0632: }
0633:
0634: /**
0635: * Send a network packet. If output for another virtual socket is
0636: * in progress this packet will be sent later.
0637: *
0638: * @param streamId the originating <code>RequestStream</code> object
0639: * @param buffer the data to send
0640: * @return the same buffer received if emptied or another buffer w/ the
0641: * same size if the incoming buffer is cached (to avoid copying)
0642: * @throws IOException if an I/O error occurs
0643: */
0644: byte[] sendNetPacket(int streamId, byte buffer[])
0645: throws IOException {
0646: synchronized (socketTable) {
0647:
0648: VirtualSocket vsock = lookup(streamId);
0649:
0650: while (vsock.inputPkts > 0) {
0651: //
0652: // There is unread data in the input buffers.
0653: // As we are sending another packet we can just discard it now.
0654: //
0655: if (Logger.isActive()) {
0656: Logger
0657: .println("TdsSocket: Unread data in input packet queue");
0658: }
0659: dequeueInput(vsock);
0660: }
0661:
0662: if (responseOwner != -1) {
0663: //
0664: // Complex case there is another stream's data in the network pipe
0665: // or we had our own incomplete request to discard first
0666: // Read and store other stream's data or flush our own.
0667: //
0668: VirtualSocket other = (VirtualSocket) socketTable
0669: .get(responseOwner);
0670: byte[] tmpBuf = null;
0671: boolean ourData = (other.owner == streamId);
0672: do {
0673: // Reuse the buffer if it's our data; we don't need it
0674: tmpBuf = readPacket(ourData ? tmpBuf : null);
0675:
0676: if (!ourData) {
0677: // We need to save this input as it belongs to
0678: // Another thread.
0679: enqueueInput(other, tmpBuf);
0680: } // Any of our input is discarded.
0681:
0682: } while (tmpBuf[1] == 0); // Read all data to complete TDS packet
0683: }
0684: //
0685: // At this point we know that we are able to send the first
0686: // or subsequent packet of a new request.
0687: //
0688: getOut().write(buffer, 0, getPktLen(buffer));
0689:
0690: if (buffer[1] != 0) {
0691: getOut().flush();
0692: // We are the response owner now
0693: responseOwner = streamId;
0694: }
0695:
0696: return buffer;
0697: }
0698: }
0699:
0700: /**
0701: * Get a network packet. This may be read from the network directly or from
0702: * previously cached buffers.
0703: *
0704: * @param streamId the originating ResponseStream object
0705: * @param buffer the data buffer to receive the object (may be replaced)
0706: * @return the data in a <code>byte[]</code> buffer
0707: * @throws IOException if an I/O error occurs
0708: */
0709: byte[] getNetPacket(int streamId, byte buffer[]) throws IOException {
0710: synchronized (socketTable) {
0711: VirtualSocket vsock = lookup(streamId);
0712:
0713: //
0714: // Return any cached input
0715: //
0716: if (vsock.inputPkts > 0) {
0717: return dequeueInput(vsock);
0718: }
0719:
0720: //
0721: // Nothing cached see if we are expecting network data
0722: //
0723: if (responseOwner == -1) {
0724: throw new IOException(
0725: "Stream "
0726: + streamId
0727: + " attempting to read when no request has been sent");
0728: }
0729: //
0730: // OK There should be data, check that it is for this stream
0731: //
0732: if (responseOwner != streamId) {
0733: // Error we are trying to read another thread's request.
0734: throw new IOException(
0735: "Stream "
0736: + streamId
0737: + " is trying to read data that belongs to stream "
0738: + responseOwner);
0739: }
0740: //
0741: // Simple case we are reading our input directly from the server
0742: //
0743: return readPacket(buffer);
0744: }
0745: }
0746:
0747: /**
0748: * Save a packet buffer in a memory queue or to a disk queue if the global
0749: * memory limit for the driver has been exceeded.
0750: *
0751: * @param vsock the virtual socket owning this data
0752: * @param buffer the data to queue
0753: */
0754: private void enqueueInput(VirtualSocket vsock, byte[] buffer)
0755: throws IOException {
0756: //
0757: // Check to see if we should start caching to disk
0758: //
0759: if (globalMemUsage + buffer.length > memoryBudget
0760: && vsock.pktQueue.size() >= minMemPkts
0761: && !securityViolation && vsock.diskQueue == null) {
0762: // Try to create a disk file for the queue
0763: try {
0764: vsock.queueFile = File.createTempFile("jtds", ".tmp",
0765: bufferDir);
0766: vsock.queueFile.deleteOnExit();
0767: vsock.diskQueue = new RandomAccessFile(vsock.queueFile,
0768: "rw");
0769:
0770: // Write current cache contents to disk and free memory
0771: byte[] tmpBuf;
0772:
0773: while (vsock.pktQueue.size() > 0) {
0774: tmpBuf = (byte[]) vsock.pktQueue.removeFirst();
0775: vsock.diskQueue.write(tmpBuf, 0, getPktLen(tmpBuf));
0776: vsock.pktsOnDisk++;
0777: }
0778: } catch (java.lang.SecurityException se) {
0779: // Not allowed to cache to disk so carry on in memory
0780: securityViolation = true;
0781: vsock.queueFile = null;
0782: vsock.diskQueue = null;
0783: }
0784: }
0785:
0786: if (vsock.diskQueue != null) {
0787: // Cache file exists so append buffer to it
0788: vsock.diskQueue.write(buffer, 0, getPktLen(buffer));
0789: vsock.pktsOnDisk++;
0790: } else {
0791: // Will cache in memory
0792: vsock.pktQueue.addLast(buffer);
0793: globalMemUsage += buffer.length;
0794:
0795: if (globalMemUsage > peakMemUsage) {
0796: peakMemUsage = globalMemUsage;
0797: }
0798: }
0799:
0800: vsock.inputPkts++;
0801: }
0802:
0803: /**
0804: * Read a cached packet from the in memory queue or from a disk based queue.
0805: *
0806: * @param vsock the virtual socket owning this data
0807: * @return a buffer containing the packet
0808: */
0809: private byte[] dequeueInput(VirtualSocket vsock) throws IOException {
0810: byte[] buffer = null;
0811:
0812: if (vsock.pktsOnDisk > 0) {
0813: // Data is cached on disk
0814: if (vsock.diskQueue.getFilePointer() == vsock.diskQueue
0815: .length()) {
0816: // First read so rewind() file
0817: vsock.diskQueue.seek(0L);
0818: }
0819:
0820: vsock.diskQueue.readFully(hdrBuf, 0, TDS_HDR_LEN);
0821:
0822: int len = getPktLen(hdrBuf);
0823:
0824: buffer = new byte[len];
0825: System.arraycopy(hdrBuf, 0, buffer, 0, TDS_HDR_LEN);
0826: vsock.diskQueue.readFully(buffer, TDS_HDR_LEN, len
0827: - TDS_HDR_LEN);
0828: vsock.pktsOnDisk--;
0829:
0830: if (vsock.pktsOnDisk < 1) {
0831: // File now empty so close and delete it
0832: try {
0833: vsock.diskQueue.close();
0834: vsock.queueFile.delete();
0835: } finally {
0836: vsock.queueFile = null;
0837: vsock.diskQueue = null;
0838: }
0839: }
0840: } else if (vsock.pktQueue.size() > 0) {
0841: buffer = (byte[]) vsock.pktQueue.removeFirst();
0842: globalMemUsage -= buffer.length;
0843: }
0844:
0845: if (buffer != null) {
0846: vsock.inputPkts--;
0847: }
0848:
0849: return buffer;
0850: }
0851:
0852: /**
0853: * Read a physical TDS packet from the network.
0854: *
0855: * @param buffer a buffer to read the data into (if it fits) or null
0856: * @return either the incoming buffer if it was large enough or a newly
0857: * allocated buffer with the read packet
0858: */
0859: private byte[] readPacket(byte buffer[]) throws IOException {
0860: //
0861: // Read rest of header
0862: try {
0863: getIn().readFully(hdrBuf);
0864: } catch (EOFException e) {
0865: throw new IOException("DB server closed connection.");
0866: }
0867:
0868: byte packetType = hdrBuf[0];
0869:
0870: if (packetType != TdsCore.LOGIN_PKT
0871: && packetType != TdsCore.QUERY_PKT
0872: && packetType != TdsCore.REPLY_PKT) {
0873: throw new IOException("Unknown packet type 0x"
0874: + Integer.toHexString(packetType & 0xFF));
0875: }
0876:
0877: // figure out how many bytes are remaining in this packet.
0878: int len = getPktLen(hdrBuf);
0879:
0880: if (len < TDS_HDR_LEN || len > 65536) {
0881: throw new IOException("Invalid network packet length "
0882: + len);
0883: }
0884:
0885: if (buffer == null || len > buffer.length) {
0886: // Create or expand the buffer as required
0887: buffer = new byte[len];
0888:
0889: if (len > maxBufSize) {
0890: maxBufSize = len;
0891: }
0892: }
0893:
0894: // Preserve the packet header in the buffer
0895: System.arraycopy(hdrBuf, 0, buffer, 0, TDS_HDR_LEN);
0896:
0897: try {
0898: getIn().readFully(buffer, TDS_HDR_LEN, len - TDS_HDR_LEN);
0899: } catch (EOFException e) {
0900: throw new IOException("DB server closed connection.");
0901: }
0902:
0903: //
0904: // SQL Server 2000 < SP3 does not set the last packet
0905: // flag in the NT challenge packet.
0906: // If this is the first packet and the length is correct
0907: // force the last packet flag on.
0908: //
0909: if (++packetCount == 1 && serverType == Driver.SQLSERVER
0910: && "NTLMSSP".equals(new String(buffer, 11, 7))) {
0911: buffer[1] = 1;
0912: }
0913:
0914: synchronized (cancelMonitor) {
0915: //
0916: // If a cancel request is outstanding check that the last TDS packet
0917: // is a TDS_DONE with the "cancek ACK" flag set. If it isn't set the
0918: // "more packets" flag; this will ensure that the stream keeps
0919: // processing until the "cancel ACK" is processed.
0920: //
0921: if (cancelPending) {
0922: //
0923: // Move what we assume to be the TDS_DONE packet into doneBuffer
0924: //
0925: if (len >= TDS_DONE_LEN + TDS_HDR_LEN) {
0926: System.arraycopy(buffer, len - TDS_DONE_LEN,
0927: doneBuffer, 0, TDS_DONE_LEN);
0928: } else {
0929: // Packet too short so TDS_DONE record was split over
0930: // two packets. Need to reassemble.
0931: int frag = len - TDS_HDR_LEN;
0932: System.arraycopy(doneBuffer, frag, doneBuffer, 0,
0933: TDS_DONE_LEN - frag);
0934: System.arraycopy(buffer, TDS_HDR_LEN, doneBuffer,
0935: TDS_DONE_LEN - frag, frag);
0936: }
0937: //
0938: // If this is the last packet and there is a cancel pending see
0939: // if the last packet contains a TDS_DONE token with the cancel
0940: // ACK set. If not reset the last packet flag so that the dedicated
0941: // cancel packet is also read and processed.
0942: //
0943: if (buffer[1] == 1) {
0944: if ((doneBuffer[0] & 0xFF) < TDS_DONE_TOKEN) {
0945: throw new IOException(
0946: "Expecting a TDS_DONE or TDS_DONEPROC.");
0947: }
0948:
0949: if ((doneBuffer[1] & TdsCore.DONE_CANCEL) != 0) {
0950: // OK have a cancel ACK packet
0951: cancelPending = false;
0952: } else {
0953: // Must be in next packet so
0954: // force client to read next packet
0955: buffer[1] = 0;
0956: }
0957: }
0958: }
0959:
0960: if (buffer[1] != 0) {
0961: // End of response; connection now free
0962: responseOwner = -1;
0963: }
0964: }
0965:
0966: return buffer;
0967: }
0968:
0969: /**
0970: * Retrieves the virtual socket with the given id.
0971: *
0972: * @param streamId id of the virtual socket to retrieve
0973: */
0974: private VirtualSocket lookup(int streamId) {
0975: if (streamId < 0 || streamId > socketTable.size()) {
0976: throw new IllegalArgumentException(
0977: "Invalid parameter stream ID " + streamId);
0978: }
0979:
0980: VirtualSocket vsock = (VirtualSocket) socketTable.get(streamId);
0981:
0982: if (vsock.owner != streamId) {
0983: throw new IllegalStateException(
0984: "Internal error: bad stream ID " + streamId);
0985: }
0986:
0987: return vsock;
0988: }
0989:
0990: /**
0991: * Convert two bytes (in network byte order) in a byte array into a Java
0992: * short integer.
0993: *
0994: * @param buf array of data
0995: * @return the 16 bit unsigned value as an <code>int</code>
0996: */
0997: static int getPktLen(byte buf[]) {
0998: int lo = ((int) buf[3] & 0xff);
0999: int hi = (((int) buf[2] & 0xff) << 8);
1000:
1001: return hi | lo;
1002: }
1003:
1004: /**
1005: * Set the socket timeout.
1006: *
1007: * @param timeout the timeout value in milliseconds
1008: */
1009: protected void setTimeout(int timeout) throws SocketException {
1010: socket.setSoTimeout(timeout);
1011: }
1012:
1013: /**
1014: * Getter for {@link SharedSocket#in} field.
1015: *
1016: * @return {@link InputStream} used for communication
1017: */
1018: protected DataInputStream getIn() {
1019: return in;
1020: }
1021:
1022: /**
1023: * Setter for {@link SharedSocket#in} field.
1024: *
1025: * @param in the {@link InputStream} to be used for communication
1026: */
1027: protected void setIn(DataInputStream in) {
1028: this .in = in;
1029: }
1030:
1031: /**
1032: * Getter for {@link SharedSocket#out} field.
1033: *
1034: * @return {@link OutputStream} used for communication
1035: */
1036: protected DataOutputStream getOut() {
1037: return out;
1038: }
1039:
1040: /**
1041: * Setter for {@link SharedSocket#out} field.
1042: *
1043: * @param out the {@link OutputStream} to be used for communication
1044: */
1045: protected void setOut(DataOutputStream out) {
1046: this .out = out;
1047: }
1048:
1049: /**
1050: * Get the server host name.
1051: *
1052: * @return the host name as a <code>String</code>
1053: */
1054: protected String getHost() {
1055: return this .host;
1056: }
1057:
1058: /**
1059: * Get the server port number.
1060: *
1061: * @return the host port as an <code>int</code>
1062: */
1063: protected int getPort() {
1064: return this.port;
1065: }
1066: }
|