001: /*-------------------------------------------------------------------------
002: *
003: * Copyright (c) 2003-2005, PostgreSQL Global Development Group
004: *
005: * IDENTIFICATION
006: * $PostgreSQL: pgjdbc/org/postgresql/core/PGStream.java,v 1.21 2007/02/28 06:10:59 jurka Exp $
007: *
008: *-------------------------------------------------------------------------
009: */
010: package org.postgresql.core;
011:
012: import java.io.BufferedOutputStream;
013: import java.io.InputStream;
014: import java.io.OutputStream;
015: import java.io.FilterOutputStream;
016: import java.io.IOException;
017: import java.io.EOFException;
018: import java.io.Writer;
019: import java.net.Socket;
020: import java.sql.*;
021:
022: import org.postgresql.util.GT;
023: import org.postgresql.util.PSQLState;
024: import org.postgresql.util.PSQLException;
025:
026: /**
027: * Wrapper around the raw connection to the server that implements some basic
028: * primitives (reading/writing formatted data, doing string encoding, etc).
029: *<p>
030: * In general, instances of PGStream are not threadsafe; the caller must ensure
031: * that only one thread at a time is accessing a particular PGStream instance.
032: */
033: public class PGStream {
034: private final String host;
035: private final int port;
036:
037: private final byte[] _int4buf;
038: private final byte[] _int2buf;
039:
040: private Socket connection;
041: private VisibleBufferedInputStream pg_input;
042: private OutputStream pg_output;
043: private byte[] streamBuffer;
044:
045: private Encoding encoding;
046: private Writer encodingWriter;
047:
048: /**
049: * Constructor: Connect to the PostgreSQL back end and return
050: * a stream connection.
051: *
052: * @param host the hostname to connect to
053: * @param port the port number that the postmaster is sitting on
054: * @exception IOException if an IOException occurs below it.
055: */
056: public PGStream(String host, int port) throws IOException {
057: this .host = host;
058: this .port = port;
059:
060: changeSocket(new Socket(host, port));
061: setEncoding(Encoding.getJVMEncoding("US-ASCII"));
062:
063: _int2buf = new byte[2];
064: _int4buf = new byte[4];
065: }
066:
067: public String getHost() {
068: return host;
069: }
070:
071: public int getPort() {
072: return port;
073: }
074:
075: public Socket getSocket() {
076: return connection;
077: }
078:
079: /**
080: * Check for pending backend messages without blocking.
081: * Might return false when there actually are messages
082: * waiting, depending on the characteristics of the
083: * underlying socket. This is used to detect asynchronous
084: * notifies from the backend, when available.
085: *
086: * @return true if there is a pending backend message
087: */
088: public boolean hasMessagePending() throws IOException {
089: return pg_input.available() > 0
090: || connection.getInputStream().available() > 0;
091: }
092:
093: /**
094: * Switch this stream to using a new socket. Any existing socket
095: * is <em>not</em> closed; it's assumed that we are changing to
096: * a new socket that delegates to the original socket (e.g. SSL).
097: *
098: * @param socket the new socket to change to
099: * @throws IOException if something goes wrong
100: */
101: public void changeSocket(Socket socket) throws IOException {
102: this .connection = socket;
103:
104: // Submitted by Jason Venner <jason@idiom.com>. Disable Nagle
105: // as we are selective about flushing output only when we
106: // really need to.
107: connection.setTcpNoDelay(true);
108:
109: // Buffer sizes submitted by Sverre H Huseby <sverrehu@online.no>
110: pg_input = new VisibleBufferedInputStream(connection
111: .getInputStream(), 8192);
112: pg_output = new BufferedOutputStream(connection
113: .getOutputStream(), 8192);
114:
115: if (encoding != null)
116: setEncoding(encoding);
117: }
118:
119: public Encoding getEncoding() {
120: return encoding;
121: }
122:
123: /**
124: * Change the encoding used by this connection.
125: *
126: * @param encoding the new encoding to use
127: * @throws IOException if something goes wrong
128: */
129: public void setEncoding(Encoding encoding) throws IOException {
130: // Close down any old writer.
131: if (encodingWriter != null)
132: encodingWriter.close();
133:
134: this .encoding = encoding;
135:
136: // Intercept flush() downcalls from the writer; our caller
137: // will call PGStream.flush() as needed.
138: OutputStream interceptor = new FilterOutputStream(pg_output) {
139: public void flush() throws IOException {
140: }
141:
142: public void close() throws IOException {
143: super .flush();
144: }
145: };
146:
147: encodingWriter = encoding.getEncodingWriter(interceptor);
148: }
149:
150: /**
151: * Get a Writer instance that encodes directly onto the underlying stream.
152: *<p>
153: * The returned Writer should not be closed, as it's a shared object.
154: * Writer.flush needs to be called when switching between use of the Writer and
155: * use of the PGStream write methods, but it won't actually flush output
156: * all the way out -- call {@link #flush} to actually ensure all output
157: * has been pushed to the server.
158: *
159: * @return the shared Writer instance
160: * @throws IOException if something goes wrong.
161: */
162: public Writer getEncodingWriter() throws IOException {
163: if (encodingWriter == null)
164: throw new IOException(
165: "No encoding has been set on this connection");
166: return encodingWriter;
167: }
168:
169: /**
170: * Sends a single character to the back end
171: *
172: * @param val the character to be sent
173: * @exception IOException if an I/O error occurs
174: */
175: public void SendChar(int val) throws IOException {
176: pg_output.write(val);
177: }
178:
179: /**
180: * Sends a 4-byte integer to the back end
181: *
182: * @param val the integer to be sent
183: * @exception IOException if an I/O error occurs
184: */
185: public void SendInteger4(int val) throws IOException {
186: _int4buf[0] = (byte) (val >>> 24);
187: _int4buf[1] = (byte) (val >>> 16);
188: _int4buf[2] = (byte) (val >>> 8);
189: _int4buf[3] = (byte) (val);
190: pg_output.write(_int4buf);
191: }
192:
193: /**
194: * Sends a 2-byte integer (short) to the back end
195: *
196: * @param val the integer to be sent
197: * @exception IOException if an I/O error occurs or <code>val</code> cannot be encoded in 2 bytes
198: */
199: public void SendInteger2(int val) throws IOException {
200: if (val < Short.MIN_VALUE || val > Short.MAX_VALUE)
201: throw new IOException(
202: "Tried to send an out-of-range integer as a 2-byte value: "
203: + val);
204:
205: _int2buf[0] = (byte) (val >>> 8);
206: _int2buf[1] = (byte) val;
207: pg_output.write(_int2buf);
208: }
209:
210: /**
211: * Send an array of bytes to the backend
212: *
213: * @param buf The array of bytes to be sent
214: * @exception IOException if an I/O error occurs
215: */
216: public void Send(byte buf[]) throws IOException {
217: pg_output.write(buf);
218: }
219:
220: /**
221: * Send a fixed-size array of bytes to the backend. If buf.length < siz,
222: * pad with zeros. If buf.lengh > siz, truncate the array.
223: *
224: * @param buf the array of bytes to be sent
225: * @param siz the number of bytes to be sent
226: * @exception IOException if an I/O error occurs
227: */
228: public void Send(byte buf[], int siz) throws IOException {
229: Send(buf, 0, siz);
230: }
231:
232: /**
233: * Send a fixed-size array of bytes to the backend. If length < siz,
234: * pad with zeros. If length > siz, truncate the array.
235: *
236: * @param buf the array of bytes to be sent
237: * @param off offset in the array to start sending from
238: * @param siz the number of bytes to be sent
239: * @exception IOException if an I/O error occurs
240: */
241: public void Send(byte buf[], int off, int siz) throws IOException {
242: int bufamt = buf.length - off;
243: pg_output.write(buf, off, bufamt < siz ? bufamt : siz);
244: for (int i = bufamt; i < siz; ++i) {
245: pg_output.write(0);
246: }
247: }
248:
249: /**
250: * Receives a single character from the backend
251: *
252: * @return the character received
253: * @exception IOException if an I/O Error occurs
254: */
255: public int ReceiveChar() throws IOException {
256: int c = pg_input.read();
257: if (c < 0)
258: throw new EOFException();
259: return c;
260: }
261:
262: /**
263: * Receives a four byte integer from the backend
264: *
265: * @return the integer received from the backend
266: * @exception IOException if an I/O error occurs
267: */
268: public int ReceiveInteger4() throws IOException {
269: if (pg_input.read(_int4buf) != 4)
270: throw new EOFException();
271:
272: return (_int4buf[0] & 0xFF) << 24 | (_int4buf[1] & 0xFF) << 16
273: | (_int4buf[2] & 0xFF) << 8 | _int4buf[3] & 0xFF;
274: }
275:
276: /**
277: * Receives a two byte integer from the backend
278: *
279: * @return the integer received from the backend
280: * @exception IOException if an I/O error occurs
281: */
282: public int ReceiveInteger2() throws IOException {
283: if (pg_input.read(_int2buf) != 2)
284: throw new EOFException();
285:
286: return (_int2buf[0] & 0xFF) << 8 | _int2buf[1] & 0xFF;
287: }
288:
289: /**
290: * Receives a fixed-size string from the backend.
291: *
292: * @param len the length of the string to receive, in bytes.
293: * @return the decoded string
294: */
295: public String ReceiveString(int len) throws IOException {
296: if (!pg_input.ensureBytes(len)) {
297: throw new EOFException();
298: }
299:
300: String res = encoding.decode(pg_input.getBuffer(), pg_input
301: .getIndex(), len);
302: pg_input.skip(len);
303: return res;
304: }
305:
306: /**
307: * Receives a null-terminated string from the backend. If we don't see a
308: * null, then we assume something has gone wrong.
309: *
310: * @return string from back end
311: * @exception IOException if an I/O error occurs, or end of file
312: */
313: public String ReceiveString() throws IOException {
314: int len = pg_input.scanCStringLength();
315: String res = encoding.decode(pg_input.getBuffer(), pg_input
316: .getIndex(), len - 1);
317: pg_input.skip(len);
318: return res;
319: }
320:
321: /**
322: * Read a tuple from the back end. A tuple is a two dimensional
323: * array of bytes. This variant reads the V3 protocol's tuple
324: * representation.
325: *
326: * @return null if the current response has no more tuples, otherwise
327: * an array of bytearrays
328: * @exception IOException if a data I/O error occurs
329: */
330: public byte[][] ReceiveTupleV3() throws IOException,
331: OutOfMemoryError {
332: //TODO: use l_msgSize
333: int l_msgSize = ReceiveInteger4();
334: int i;
335: int l_nf = ReceiveInteger2();
336: byte[][] answer = new byte[l_nf][];
337:
338: OutOfMemoryError oom = null;
339: for (i = 0; i < l_nf; ++i) {
340: int l_size = ReceiveInteger4();
341: if (l_size != -1) {
342: try {
343: answer[i] = new byte[l_size];
344: Receive(answer[i], 0, l_size);
345: } catch (OutOfMemoryError oome) {
346: oom = oome;
347: Skip(l_size);
348: }
349: }
350: }
351:
352: if (oom != null)
353: throw oom;
354:
355: return answer;
356: }
357:
358: /**
359: * Read a tuple from the back end. A tuple is a two dimensional
360: * array of bytes. This variant reads the V2 protocol's tuple
361: * representation.
362: *
363: * @param nf the number of fields expected
364: * @param bin true if the tuple is a binary tuple
365: * @return null if the current response has no more tuples, otherwise
366: * an array of bytearrays
367: * @exception IOException if a data I/O error occurs
368: */
369: public byte[][] ReceiveTupleV2(int nf, boolean bin)
370: throws IOException, OutOfMemoryError {
371: int i, bim = (nf + 7) / 8;
372: byte[] bitmask = Receive(bim);
373: byte[][] answer = new byte[nf][];
374:
375: int whichbit = 0x80;
376: int whichbyte = 0;
377:
378: OutOfMemoryError oom = null;
379: for (i = 0; i < nf; ++i) {
380: boolean isNull = ((bitmask[whichbyte] & whichbit) == 0);
381: whichbit >>= 1;
382: if (whichbit == 0) {
383: ++whichbyte;
384: whichbit = 0x80;
385: }
386: if (!isNull) {
387: int len = ReceiveInteger4();
388: if (!bin)
389: len -= 4;
390: if (len < 0)
391: len = 0;
392: try {
393: answer[i] = new byte[len];
394: Receive(answer[i], 0, len);
395: } catch (OutOfMemoryError oome) {
396: oom = oome;
397: Skip(len);
398: }
399: }
400: }
401:
402: if (oom != null)
403: throw oom;
404:
405: return answer;
406: }
407:
408: /**
409: * Reads in a given number of bytes from the backend
410: *
411: * @param siz number of bytes to read
412: * @return array of bytes received
413: * @exception IOException if a data I/O error occurs
414: */
415: public byte[] Receive(int siz) throws IOException {
416: byte[] answer = new byte[siz];
417: Receive(answer, 0, siz);
418: return answer;
419: }
420:
421: /**
422: * Reads in a given number of bytes from the backend
423: *
424: * @param buf buffer to store result
425: * @param off offset in buffer
426: * @param siz number of bytes to read
427: * @exception IOException if a data I/O error occurs
428: */
429: public void Receive(byte[] buf, int off, int siz)
430: throws IOException {
431: int s = 0;
432:
433: while (s < siz) {
434: int w = pg_input.read(buf, off + s, siz - s);
435: if (w < 0)
436: throw new EOFException();
437: s += w;
438: }
439: }
440:
441: public void Skip(int size) throws IOException {
442: long s = 0;
443: while (s < size) {
444: s += pg_input.skip(size - s);
445: }
446: }
447:
448: /**
449: * Copy data from an input stream to the connection.
450: *
451: * @param inStream the stream to read data from
452: * @param remaining the number of bytes to copy
453: */
454: public void SendStream(InputStream inStream, int remaining)
455: throws IOException {
456: int expectedLength = remaining;
457: if (streamBuffer == null)
458: streamBuffer = new byte[8192];
459:
460: while (remaining > 0) {
461: int count = (remaining > streamBuffer.length ? streamBuffer.length
462: : remaining);
463: int readCount;
464:
465: try {
466: readCount = inStream.read(streamBuffer, 0, count);
467: if (readCount < 0)
468: throw new EOFException(
469: GT
470: .tr(
471: "Premature end of input stream, expected {0} bytes, but only read {1}.",
472: new Object[] {
473: new Integer(
474: expectedLength),
475: new Integer(
476: expectedLength
477: - remaining) }));
478: } catch (IOException ioe) {
479: while (remaining > 0) {
480: Send(streamBuffer, count);
481: remaining -= count;
482: count = (remaining > streamBuffer.length ? streamBuffer.length
483: : remaining);
484: }
485: throw new PGBindException(ioe);
486: }
487:
488: Send(streamBuffer, readCount);
489: remaining -= readCount;
490: }
491: }
492:
493: /**
494: * Flush any pending output to the backend.
495: * @exception IOException if an I/O error occurs
496: */
497: public void flush() throws IOException {
498: if (encodingWriter != null)
499: encodingWriter.flush();
500: pg_output.flush();
501: }
502:
503: /**
504: * Consume an expected EOF from the backend
505: * @exception SQLException if we get something other than an EOF
506: */
507: public void ReceiveEOF() throws SQLException, IOException {
508: int c = pg_input.read();
509: if (c < 0)
510: return;
511: throw new PSQLException(GT
512: .tr("Expected an EOF from server, got: {0}",
513: new Integer(c)), PSQLState.COMMUNICATION_ERROR);
514: }
515:
516: /**
517: * Closes the connection
518: *
519: * @exception IOException if an I/O Error occurs
520: */
521: public void close() throws IOException {
522: if (encodingWriter != null)
523: encodingWriter.close();
524:
525: pg_output.close();
526: pg_input.close();
527: connection.close();
528: }
529: }
|