001: /*
002: Copyright (C) 2002-2004 MySQL AB
003:
004: This program is free software; you can redistribute it and/or modify
005: it under the terms of version 2 of the GNU General Public License as
006: published by the Free Software Foundation.
007:
008: There are special exceptions to the terms and conditions of the GPL
009: as it is applied to this software. View the full text of the
010: exception in file EXCEPTIONS-CONNECTOR-J in the directory of this
011: software distribution.
012:
013: This program is distributed in the hope that it will be useful,
014: but WITHOUT ANY WARRANTY; without even the implied warranty of
015: MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
016: GNU General Public License for more details.
017:
018: You should have received a copy of the GNU General Public License
019: along with this program; if not, write to the Free Software
020: Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
021:
022:
023:
024: */
025: package com.mysql.jdbc;
026:
027: import java.io.EOFException;
028: import java.io.IOException;
029: import java.io.InputStream;
030:
031: import java.sql.SQLException;
032:
033: import java.util.zip.DataFormatException;
034: import java.util.zip.Inflater;
035:
036: /**
037: * Used to de-compress packets from the MySQL server when protocol-level
038: * compression is turned on.
039: *
040: * @author Mark Matthews
041: *
042: * @version $Id: CompressedInputStream.java,v 1.1.2.1 2005/05/13 18:58:37
043: * mmatthews Exp $
044: */
045: class CompressedInputStream extends InputStream {
046: /** The packet data after it has been un-compressed */
047: private byte[] buffer;
048:
049: /** The connection that is using us (used to read config values) */
050: private Connection connection;
051:
052: /** The stream we are reading from the server */
053: private InputStream in;
054:
055: /** The ZIP inflater used to un-compress packets */
056: private Inflater inflater;
057:
058: /**
059: * The buffer to read packet headers into
060: */
061: private byte[] packetHeaderBuffer = new byte[7];
062:
063: /** The position we are reading from */
064: private int pos = 0;
065:
066: /**
067: * Creates a new CompressedInputStream that reads the given stream from the
068: * server.
069: *
070: * @param conn
071: * DOCUMENT ME!
072: * @param streamFromServer
073: */
074: public CompressedInputStream(Connection conn,
075: InputStream streamFromServer) {
076: this .connection = conn;
077: this .in = streamFromServer;
078: this .inflater = new Inflater();
079: }
080:
081: /**
082: * @see java.io.InputStream#available()
083: */
084: public int available() throws IOException {
085: if (this .buffer == null) {
086: return this .in.available();
087: }
088:
089: return this .buffer.length - this .pos + this .in.available();
090: }
091:
092: /**
093: * @see java.io.InputStream#close()
094: */
095: public void close() throws IOException {
096: this .in.close();
097: this .buffer = null;
098: this .inflater = null;
099: }
100:
101: /**
102: * Retrieves and un-compressed (if necessary) the next packet from the
103: * server.
104: *
105: * @throws IOException
106: * if an I/O error occurs
107: */
108: private void getNextPacketFromServer() throws IOException {
109: byte[] uncompressedData = null;
110:
111: int lengthRead = readFully(this .packetHeaderBuffer, 0, 7);
112:
113: if (lengthRead < 7) {
114: throw new IOException("Unexpected end of input stream");
115: }
116:
117: int compressedPacketLength = ((this .packetHeaderBuffer[0] & 0xff))
118: + (((this .packetHeaderBuffer[1] & 0xff)) << 8)
119: + (((this .packetHeaderBuffer[2] & 0xff)) << 16);
120:
121: int uncompressedLength = ((this .packetHeaderBuffer[4] & 0xff))
122: + (((this .packetHeaderBuffer[5] & 0xff)) << 8)
123: + (((this .packetHeaderBuffer[6] & 0xff)) << 16);
124:
125: if (this .connection.getTraceProtocol()) {
126: try {
127: this .connection.getLog().logTrace(
128: "Reading compressed packet of length "
129: + compressedPacketLength
130: + " uncompressed to "
131: + uncompressedLength);
132: } catch (SQLException sqlEx) {
133: throw new IOException(sqlEx.toString()); // should never
134: // happen
135: }
136: }
137:
138: if (uncompressedLength > 0) {
139: uncompressedData = new byte[uncompressedLength];
140:
141: byte[] compressedBuffer = new byte[compressedPacketLength];
142:
143: readFully(compressedBuffer, 0, compressedPacketLength);
144:
145: try {
146: this .inflater.reset();
147: } catch (NullPointerException npe) {
148: this .inflater = new Inflater();
149: }
150:
151: this .inflater.setInput(compressedBuffer);
152:
153: try {
154: this .inflater.inflate(uncompressedData);
155: } catch (DataFormatException dfe) {
156: throw new IOException(
157: "Error while uncompressing packet from server.");
158: }
159:
160: this .inflater.end();
161: } else {
162: if (this .connection.getTraceProtocol()) {
163: try {
164: this .connection
165: .getLog()
166: .logTrace(
167: "Packet didn't meet compression threshold, not uncompressing...");
168: } catch (SQLException sqlEx) {
169: throw new IOException(sqlEx.toString()); // should never
170: // happen
171: }
172: }
173:
174: //
175: // Read data, note this this code is reached when using
176: // compressed packets that have not been compressed, as well
177: //
178: uncompressedData = new byte[compressedPacketLength];
179: readFully(uncompressedData, 0, compressedPacketLength);
180: }
181:
182: if (this .connection.getTraceProtocol()) {
183: try {
184: this .connection.getLog().logTrace(
185: "Uncompressed packet: \n"
186: + StringUtils.dumpAsHex(
187: uncompressedData,
188: compressedPacketLength));
189: } catch (SQLException sqlEx) {
190: throw new IOException(sqlEx.toString()); // should never
191: // happen
192: }
193: }
194:
195: if ((this .buffer != null) && (this .pos < this .buffer.length)) {
196: if (this .connection.getTraceProtocol()) {
197: try {
198: this .connection.getLog().logTrace(
199: "Combining remaining packet with new: ");
200: } catch (SQLException sqlEx) {
201: throw new IOException(sqlEx.toString()); // should never
202: // happen
203: }
204: }
205:
206: int remaining = this .buffer.length - this .pos;
207: byte[] newBuffer = new byte[remaining
208: + uncompressedData.length];
209:
210: int newIndex = 0;
211:
212: for (int i = this .pos; i < this .buffer.length; i++)
213: newBuffer[newIndex++] = this .buffer[i];
214:
215: System.arraycopy(uncompressedData, 0, newBuffer, newIndex,
216: uncompressedData.length);
217:
218: uncompressedData = newBuffer;
219: }
220:
221: this .pos = 0;
222: this .buffer = uncompressedData;
223:
224: return;
225: }
226:
227: /**
228: * Determines if another packet needs to be read from the server to be able
229: * to read numBytes from the stream.
230: *
231: * @param numBytes
232: * the number of bytes to be read
233: *
234: * @throws IOException
235: * if an I/O error occors.
236: */
237: private void getNextPacketIfRequired(int numBytes)
238: throws IOException {
239: if ((this .buffer == null)
240: || ((this .pos + numBytes) > this .buffer.length)) {
241: getNextPacketFromServer();
242: }
243: }
244:
245: /**
246: * @see java.io.InputStream#read()
247: */
248: public int read() throws IOException {
249: try {
250: getNextPacketIfRequired(1);
251: } catch (IOException ioEx) {
252: return -1;
253: }
254:
255: return this .buffer[this .pos++] & 0xff;
256: }
257:
258: /**
259: * @see java.io.InputStream#read(byte)
260: */
261: public int read(byte[] b) throws IOException {
262: return read(b, 0, b.length);
263: }
264:
265: /**
266: * @see java.io.InputStream#read(byte, int, int)
267: */
268: public int read(byte[] b, int off, int len) throws IOException {
269: if (b == null) {
270: throw new NullPointerException();
271: } else if ((off < 0) || (off > b.length) || (len < 0)
272: || ((off + len) > b.length) || ((off + len) < 0)) {
273: throw new IndexOutOfBoundsException();
274: }
275:
276: if (len <= 0) {
277: return 0;
278: }
279:
280: try {
281: getNextPacketIfRequired(len);
282: } catch (IOException ioEx) {
283: return -1;
284: }
285:
286: System.arraycopy(this .buffer, this .pos, b, off, len);
287: this .pos += len;
288:
289: return len;
290: }
291:
292: private final int readFully(byte[] b, int off, int len)
293: throws IOException {
294: if (len < 0) {
295: throw new IndexOutOfBoundsException();
296: }
297:
298: int n = 0;
299:
300: while (n < len) {
301: int count = this .in.read(b, off + n, len - n);
302:
303: if (count < 0) {
304: throw new EOFException();
305: }
306:
307: n += count;
308: }
309:
310: return n;
311: }
312:
313: /**
314: * @see java.io.InputStream#skip(long)
315: */
316: public long skip(long n) throws IOException {
317: long count = 0;
318:
319: for (long i = 0; i < n; i++) {
320: int bytesRead = read();
321:
322: if (bytesRead == -1) {
323: break;
324: }
325:
326: count++;
327: }
328:
329: return count;
330: }
331: }
|