001: /*
002: * Copyright 1999-2004 The Apache Software Foundation
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.apache.jk.common;
018:
019: import java.io.IOException;
020: import java.io.InputStream;
021: import org.apache.jk.core.JkHandler;
022: import org.apache.jk.core.Msg;
023: import org.apache.jk.core.MsgContext;
024: import org.apache.tomcat.util.buf.ByteChunk;
025:
026: /** Generic input stream impl on top of ajp
027: */
028: public class JkInputStream extends InputStream {
029: private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
030: .getLog(JkInputStream.class);
031:
032: public JkInputStream() {
033: }
034:
035: public int available() throws IOException {
036: if (log.isDebugEnabled())
037: log.debug("available(): " + blen + " " + pos);
038: return blen - pos;
039: }
040:
041: public void close() throws IOException {
042: if (log.isDebugEnabled())
043: log.debug("cloae() ");
044: this .closed = true;
045: }
046:
047: public void mark(int readLimit) {
048: }
049:
050: public boolean markSupported() {
051: return false;
052: }
053:
054: public void reset() throws IOException {
055: throw new IOException("reset() not supported");
056: }
057:
058: public int read() throws IOException {
059: if (contentLength == -1) {
060: return doRead1();
061: }
062: if (available <= 0) {
063: if (log.isDebugEnabled())
064: log.debug("doRead() nothing available");
065: return -1;
066: }
067: available--;
068:
069: return doRead1();
070: }
071:
072: public int read(byte[] b) throws IOException {
073: int rd = read(b, 0, b.length);
074: if (log.isDebugEnabled())
075: log.debug("read(" + b + ")=" + rd + " / " + b.length);
076: return rd;
077: }
078:
079: public int read(byte[] b, int off, int len) throws IOException {
080: int rd = -1;
081: if (contentLength == -1) {
082: rd = doRead1(b, off, len);
083: return rd;
084: }
085: if (available <= 0) {
086: if (log.isDebugEnabled())
087: log.debug("doRead() nothing available");
088: return -1;
089: }
090:
091: rd = doRead1(b, off, len);
092: available -= rd;
093: if (log.isDebugEnabled())
094: log.debug("Read: " + new String(b, off, len));
095: return rd;
096: }
097:
098: public long skip(long n) throws IOException {
099: if (n > Integer.MAX_VALUE) {
100: throw new IOException("can't skip than many: " + n);
101: }
102: // XXX if n is big, split this in multiple reads
103: byte[] b = new byte[(int) n];
104: return read(b, 0, b.length);
105: }
106:
107: // -------------------- Jk specific methods --------------------
108:
109: Msg bodyMsg = new MsgAjp();
110: MsgContext mc;
111:
112: // Total length of the body - maximum we can read
113: // If -1, we don't use any limit, and we don't count available
114: int contentLength;
115: // How much remains unread.
116: int available;
117:
118: boolean closed = false;
119:
120: // Ajp13 specific - needs refactoring for the new model
121: public static final int MAX_PACKET_SIZE = 8192;
122: public static final int H_SIZE = 4; // Size of basic packet header
123: public static final int MAX_READ_SIZE = MAX_PACKET_SIZE - H_SIZE
124: - 2;
125: public static final byte JK_AJP13_GET_BODY_CHUNK = 6;
126:
127: // Holds incoming chunks of request body data
128: // XXX We do a copy that could be avoided !
129: byte[] bodyBuff = new byte[9000];
130: int blen; // Length of current chunk of body data in buffer
131: int pos; // Current read position within that buffer
132:
133: boolean end_of_stream = false; // true if we've received an empty packet
134:
135: private int doRead1() throws IOException {
136: if (pos >= blen) {
137: if (!refillReadBuffer()) {
138: return -1;
139: }
140: }
141: int i = bodyBuff[pos++] & 0xFF;
142: if (log.isDebugEnabled())
143: log.debug("doRead1 " + (char) i);
144: return i; // prevent sign extension of byte value
145: }
146:
147: public int doRead1(byte[] b, int off, int len) throws IOException {
148: if (pos >= blen) {
149: if (!refillReadBuffer()) {
150: return -1;
151: }
152: }
153:
154: if (pos + len <= blen) { // Fear the off by one error
155: // Sanity check b.length > off + len?
156: System.arraycopy(bodyBuff, pos, b, off, len);
157: if (log.isDebugEnabled())
158: log.debug("doRead1: " + pos + " " + len + " " + blen);
159: if (log.isTraceEnabled())
160: log.trace("Data: \n" + new String(b, off, len));
161: pos += len;
162: return len;
163: }
164:
165: // Not enough data (blen < pos + len) or chunked encoded
166: int toCopy = len;
167: while (toCopy > 0) {
168: int bytesRemaining = blen - pos;
169: if (bytesRemaining < 0)
170: bytesRemaining = 0;
171: int c = bytesRemaining < toCopy ? bytesRemaining : toCopy;
172:
173: System.arraycopy(bodyBuff, pos, b, off, c);
174: if (log.isDebugEnabled())
175: log.debug("doRead2: " + pos + " " + len + " " + blen
176: + " " + c);
177: if (log.isTraceEnabled())
178: log.trace("Data: \n"
179: + new String(b, off, (len < blen - 1) ? len
180: : blen - 1));
181:
182: toCopy -= c;
183:
184: off += c;
185: pos += c; // In case we exactly consume the buffer
186:
187: if (toCopy > 0)
188: if (!refillReadBuffer()) { // Resets blen and pos
189: break;
190: }
191: }
192:
193: return len - toCopy;
194: }
195:
196: /** Must be called after the request is parsed, before
197: * any input
198: */
199: public void setContentLength(int i) {
200: contentLength = i;
201: available = i;
202: }
203:
204: /** Must be called when the stream is created
205: */
206: public void setMsgContext(MsgContext mc) {
207: this .mc = mc;
208: }
209:
210: /** Must be called before or after each request
211: */
212: public void recycle() {
213: available = 0;
214: blen = 0;
215: pos = 0;
216: closed = false;
217: end_of_stream = false;
218: contentLength = -1;
219: }
220:
221: /**
222: */
223: public int doRead(ByteChunk responseChunk) throws IOException {
224: if (log.isDebugEnabled())
225: log.debug("doRead " + pos + " " + blen + " " + available
226: + " " + end_of_stream + " "
227: + responseChunk.getOffset() + " "
228: + responseChunk.getLength());
229: if (end_of_stream) {
230: return -1;
231: }
232: if (blen == pos) {
233: if (!refillReadBuffer()) {
234: return -1;
235: }
236: }
237: responseChunk.setBytes(bodyBuff, pos, blen);
238: pos = blen;
239: return blen;
240: }
241:
242: /** Receive a chunk of data. Called to implement the
243: * 'special' packet in ajp13 and to receive the data
244: * after we send a GET_BODY packet
245: */
246: public boolean receive() throws IOException {
247: mc.setType(JkHandler.HANDLE_RECEIVE_PACKET);
248: bodyMsg.reset();
249: int err = mc.getSource().receive(bodyMsg, mc);
250: if (log.isDebugEnabled())
251: log.info("Receiving: getting request body chunk " + err
252: + " " + bodyMsg.getLen());
253:
254: if (err < 0) {
255: throw new IOException();
256: }
257:
258: pos = 0;
259: blen = 0;
260:
261: // No data received.
262: if (bodyMsg.getLen() == 0) { // just the header
263: // Don't mark 'end of stream' for the first chunk.
264: // end_of_stream = true;
265: return false;
266: }
267: blen = bodyMsg.peekInt();
268:
269: if (blen == 0) {
270: return false;
271: }
272:
273: if (blen > bodyBuff.length) {
274: bodyMsg.dump("Body");
275: }
276:
277: if (log.isTraceEnabled()) {
278: bodyMsg.dump("Body buffer");
279: }
280:
281: int cpl = bodyMsg.getBytes(bodyBuff);
282:
283: if (log.isDebugEnabled())
284: log.debug("Copy into body buffer2 " + bodyBuff + " " + cpl
285: + " " + blen);
286:
287: if (log.isTraceEnabled())
288: log.trace("Data:\n" + new String(bodyBuff, 0, cpl));
289:
290: return (blen > 0);
291: }
292:
293: /**
294: * Get more request body data from the web server and store it in the
295: * internal buffer.
296: *
297: * @return true if there is more data, false if not.
298: */
299: private boolean refillReadBuffer() throws IOException {
300: // If the server returns an empty packet, assume that that end of
301: // the stream has been reached (yuck -- fix protocol??).
302: if (end_of_stream) {
303: if (log.isDebugEnabled())
304: log.debug("refillReadBuffer: end of stream ");
305: return false;
306: }
307:
308: // Why not use outBuf??
309: bodyMsg.reset();
310: bodyMsg.appendByte(JK_AJP13_GET_BODY_CHUNK);
311: bodyMsg.appendInt(MAX_READ_SIZE);
312:
313: if (log.isDebugEnabled())
314: log.debug("refillReadBuffer " + Thread.currentThread());
315:
316: mc.setType(JkHandler.HANDLE_SEND_PACKET);
317: mc.getSource().send(bodyMsg, mc);
318:
319: // In JNI mode, response will be in bodyMsg. In TCP mode, response need to be
320: // read
321:
322: //bodyMsg.dump("refillReadBuffer ");
323:
324: boolean moreData = receive();
325: if (!moreData) {
326: end_of_stream = true;
327: }
328: return moreData;
329: }
330:
331: }
|