001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.jk.common;
019:
020: import java.io.IOException;
021:
022: import org.apache.coyote.OutputBuffer;
023: import org.apache.coyote.InputBuffer;
024: import org.apache.coyote.Request;
025: import org.apache.coyote.Response;
026:
027: import org.apache.jk.core.Msg;
028: import org.apache.jk.core.MsgContext;
029:
030: import org.apache.tomcat.util.buf.ByteChunk;
031: import org.apache.tomcat.util.buf.MessageBytes;
032: import org.apache.tomcat.util.buf.C2BConverter;
033: import org.apache.tomcat.util.http.HttpMessages;
034: import org.apache.tomcat.util.http.MimeHeaders;
035:
036: /** Generic input stream impl on top of ajp
037: */
038: public class JkInputStream implements InputBuffer, OutputBuffer {
039: private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
040: .getLog(JkInputStream.class);
041:
042: private Msg bodyMsg;
043: private Msg outputMsg;
044: private MsgContext mc;
045:
046: // Holds incoming chunks of request body data
047: private MessageBytes bodyBuff = MessageBytes.newInstance();
048: private MessageBytes tempMB = MessageBytes.newInstance();
049: private boolean end_of_stream = false;
050: private boolean isEmpty = true;
051: private boolean isFirst = true;
052: private boolean isReplay = false;
053: private boolean isReadRequired = false;
054:
055: static {
056: // Make certain HttpMessages is loaded for SecurityManager
057: try {
058: Class.forName("org.apache.tomcat.util.http.HttpMessages");
059: } catch (Exception ex) {
060: // ignore
061: }
062: }
063:
064: public JkInputStream(MsgContext context, int bsize) {
065: mc = context;
066: bodyMsg = new MsgAjp(bsize);
067: outputMsg = new MsgAjp(bsize);
068: }
069:
070: /**
071: * @deprecated
072: */
073: public JkInputStream(MsgContext context) {
074: this (context, 8 * 1024);
075: }
076:
077: // -------------------- Jk specific methods --------------------
078:
079: /**
080: * Set the flag saying that the server is sending a body
081: */
082: public void setIsReadRequired(boolean irr) {
083: isReadRequired = irr;
084: }
085:
086: /**
087: * Return the flag saying that the server is sending a body
088: */
089: public boolean isReadRequired() {
090: return isReadRequired;
091: }
092:
093: /** Must be called before or after each request
094: */
095: public void recycle() {
096: if (isReadRequired && isFirst) {
097: // The Servlet never read the request body, so we need to junk it
098: try {
099: receive();
100: } catch (IOException iex) {
101: log.debug("Error consuming request body", iex);
102: }
103: }
104:
105: end_of_stream = false;
106: isEmpty = true;
107: isFirst = true;
108: isReplay = false;
109: isReadRequired = false;
110: bodyBuff.recycle();
111: tempMB.recycle();
112: }
113:
114: public void endMessage() throws IOException {
115: outputMsg.reset();
116: outputMsg.appendByte(AjpConstants.JK_AJP13_END_RESPONSE);
117: outputMsg.appendByte(1);
118: mc.getSource().send(outputMsg, mc);
119: mc.getSource().flush(outputMsg, mc);
120: }
121:
122: // -------------------- OutputBuffer implementation --------------------
123:
124: public int doWrite(ByteChunk chunk, Response res)
125: throws IOException {
126: if (!res.isCommitted()) {
127: // Send the connector a request for commit. The connector should
128: // then validate the headers, send them (using sendHeader) and
129: // set the filters accordingly.
130: res.sendHeaders();
131: }
132:
133: int len = chunk.getLength();
134: byte buf[] = outputMsg.getBuffer();
135: // 4 - hardcoded, byte[] marshalling overhead
136: int chunkSize = buf.length - outputMsg.getHeaderLength() - 4;
137: int off = 0;
138: while (len > 0) {
139: int this Time = len;
140: if (this Time > chunkSize) {
141: this Time = chunkSize;
142: }
143: len -= this Time;
144:
145: outputMsg.reset();
146: outputMsg.appendByte(AjpConstants.JK_AJP13_SEND_BODY_CHUNK);
147: if (log.isTraceEnabled())
148: log
149: .trace("doWrite " + off + " " + this Time + " "
150: + len);
151: outputMsg.appendBytes(chunk.getBytes(), chunk.getOffset()
152: + off, this Time);
153: off += this Time;
154: mc.getSource().send(outputMsg, mc);
155: }
156: return 0;
157: }
158:
159: public int doRead(ByteChunk responseChunk, Request req)
160: throws IOException {
161:
162: if (log.isDebugEnabled())
163: log.debug("doRead " + end_of_stream + " "
164: + responseChunk.getOffset() + " "
165: + responseChunk.getLength());
166: if (end_of_stream) {
167: return -1;
168: }
169:
170: if (isFirst && isReadRequired) {
171: // Handle special first-body-chunk, but only if httpd expects it.
172: if (!receive()) {
173: return 0;
174: }
175: } else if (isEmpty) {
176: if (!refillReadBuffer()) {
177: return -1;
178: }
179: }
180: ByteChunk bc = bodyBuff.getByteChunk();
181: responseChunk.setBytes(bc.getBuffer(), bc.getStart(), bc
182: .getLength());
183: isEmpty = true;
184: return responseChunk.getLength();
185: }
186:
187: /** Receive a chunk of data. Called to implement the
188: * 'special' packet in ajp13 and to receive the data
189: * after we send a GET_BODY packet
190: */
191: public boolean receive() throws IOException {
192: isFirst = false;
193: bodyMsg.reset();
194: int err = mc.getSource().receive(bodyMsg, mc);
195: if (log.isDebugEnabled())
196: log.info("Receiving: getting request body chunk " + err
197: + " " + bodyMsg.getLen());
198:
199: if (err < 0) {
200: throw new IOException();
201: }
202:
203: // No data received.
204: if (bodyMsg.getLen() == 0) { // just the header
205: // Don't mark 'end of stream' for the first chunk.
206: // end_of_stream = true;
207: return false;
208: }
209: int blen = bodyMsg.peekInt();
210:
211: if (blen == 0) {
212: return false;
213: }
214:
215: if (log.isTraceEnabled()) {
216: bodyMsg.dump("Body buffer");
217: }
218:
219: bodyMsg.getBytes(bodyBuff);
220: if (log.isTraceEnabled())
221: log.trace("Data:\n" + bodyBuff);
222: isEmpty = false;
223: return true;
224: }
225:
226: /**
227: * Get more request body data from the web server and store it in the
228: * internal buffer.
229: *
230: * @return true if there is more data, false if not.
231: */
232: private boolean refillReadBuffer() throws IOException {
233: // If the server returns an empty packet, assume that that end of
234: // the stream has been reached (yuck -- fix protocol??).
235: if (isReplay) {
236: end_of_stream = true; // we've read everything there is
237: }
238: if (end_of_stream) {
239: if (log.isDebugEnabled())
240: log.debug("refillReadBuffer: end of stream ");
241: return false;
242: }
243:
244: // Why not use outBuf??
245: bodyMsg.reset();
246: bodyMsg.appendByte(AjpConstants.JK_AJP13_GET_BODY_CHUNK);
247: bodyMsg.appendInt(AjpConstants.MAX_READ_SIZE);
248:
249: if (log.isDebugEnabled())
250: log.debug("refillReadBuffer " + Thread.currentThread());
251:
252: mc.getSource().send(bodyMsg, mc);
253: mc.getSource().flush(bodyMsg, mc); // Server needs to get it
254:
255: // In JNI mode, response will be in bodyMsg. In TCP mode, response need to be
256: // read
257:
258: boolean moreData = receive();
259: if (!moreData) {
260: end_of_stream = true;
261: }
262: return moreData;
263: }
264:
265: public void appendHead(Response res) throws IOException {
266: if (log.isDebugEnabled())
267: log.debug("COMMIT sending headers " + res + " "
268: + res.getMimeHeaders());
269:
270: C2BConverter c2b = mc.getConverter();
271:
272: outputMsg.reset();
273: outputMsg.appendByte(AjpConstants.JK_AJP13_SEND_HEADERS);
274: outputMsg.appendInt(res.getStatus());
275:
276: String message = res.getMessage();
277: if (message == null) {
278: message = HttpMessages.getMessage(res.getStatus());
279: } else {
280: message = message.replace('\n', ' ').replace('\r', ' ');
281: }
282: tempMB.setString(message);
283: c2b.convert(tempMB);
284: outputMsg.appendBytes(tempMB);
285:
286: // XXX add headers
287:
288: MimeHeaders headers = res.getMimeHeaders();
289: String contentType = res.getContentType();
290: if (contentType != null) {
291: headers.setValue("Content-Type").setString(contentType);
292: }
293: String contentLanguage = res.getContentLanguage();
294: if (contentLanguage != null) {
295: headers.setValue("Content-Language").setString(
296: contentLanguage);
297: }
298: long contentLength = res.getContentLengthLong();
299: if (contentLength >= 0) {
300: headers.setValue("Content-Length").setLong(contentLength);
301: }
302: int numHeaders = headers.size();
303: outputMsg.appendInt(numHeaders);
304: for (int i = 0; i < numHeaders; i++) {
305: MessageBytes hN = headers.getName(i);
306: // no header to sc conversion - there's little benefit
307: // on this direction
308: c2b.convert(hN);
309: outputMsg.appendBytes(hN);
310:
311: MessageBytes hV = headers.getValue(i);
312: c2b.convert(hV);
313: outputMsg.appendBytes(hV);
314: }
315: mc.getSource().send(outputMsg, mc);
316: }
317:
318: /**
319: * Set the replay buffer for Form auth
320: */
321: public void setReplay(ByteChunk replay) {
322: isFirst = false;
323: isEmpty = false;
324: isReplay = true;
325: bodyBuff.setBytes(replay.getBytes(), replay.getStart(), replay
326: .getLength());
327: }
328:
329: }
|