001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.coyote.http11;
019:
020: import java.io.IOException;
021: import java.nio.ByteBuffer;
022: import java.nio.channels.SelectionKey;
023: import java.nio.channels.Selector;
024:
025: import org.apache.coyote.ActionCode;
026: import org.apache.coyote.OutputBuffer;
027: import org.apache.coyote.Response;
028: import org.apache.tomcat.util.buf.ByteChunk;
029: import org.apache.tomcat.util.buf.CharChunk;
030: import org.apache.tomcat.util.buf.MessageBytes;
031: import org.apache.tomcat.util.http.HttpMessages;
032: import org.apache.tomcat.util.http.MimeHeaders;
033: import org.apache.tomcat.util.net.NioChannel;
034: import org.apache.tomcat.util.net.NioEndpoint;
035: import org.apache.tomcat.util.net.NioSelectorPool;
036: import org.apache.tomcat.util.res.StringManager;
037:
038: /**
039: * Output buffer.
040: *
041: * @author <a href="mailto:remm@apache.org">Remy Maucherat</a>
042: * @author Filip Hanik
043: */
044: public class InternalNioOutputBuffer implements OutputBuffer {
045:
046: // -------------------------------------------------------------- Constants
047:
048: // ----------------------------------------------------------- Constructors
049: int bbufLimit = 0;
050:
051: /**
052: * Default constructor.
053: */
054: public InternalNioOutputBuffer(Response response) {
055: this (response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE, 10000);
056: }
057:
058: /**
059: * Alternate constructor.
060: */
061: public InternalNioOutputBuffer(Response response,
062: int headerBufferSize, long writeTimeout) {
063:
064: this .response = response;
065: headers = response.getMimeHeaders();
066:
067: buf = new byte[headerBufferSize];
068:
069: if (headerBufferSize < (8 * 1024)) {
070: bbufLimit = 6 * 1500;
071: } else {
072: bbufLimit = (headerBufferSize / 1500 + 1) * 1500;
073: }
074: //bbuf = ByteBuffer.allocateDirect(bbufLimit);
075:
076: outputStreamOutputBuffer = new SocketOutputBuffer();
077:
078: filterLibrary = new OutputFilter[0];
079: activeFilters = new OutputFilter[0];
080: lastActiveFilter = -1;
081:
082: committed = false;
083: finished = false;
084:
085: this .writeTimeout = writeTimeout;
086:
087: // Cause loading of HttpMessages
088: HttpMessages.getMessage(200);
089:
090: }
091:
092: // -------------------------------------------------------------- Variables
093:
094: /**
095: * The string manager for this package.
096: */
097: protected static StringManager sm = StringManager
098: .getManager(Constants.Package);
099:
100: // ----------------------------------------------------- Instance Variables
101:
102: /**
103: * Associated Coyote response.
104: */
105: protected Response response;
106:
107: /**
108: * Headers of the associated request.
109: */
110: protected MimeHeaders headers;
111:
112: /**
113: * Committed flag.
114: */
115: protected boolean committed;
116:
117: /**
118: * Finished flag.
119: */
120: protected boolean finished;
121:
122: /**
123: * Pointer to the current write buffer.
124: */
125: protected byte[] buf;
126:
127: /**
128: * Position in the buffer.
129: */
130: protected int pos;
131:
132: /**
133: * Underlying socket.
134: */
135: protected NioChannel socket;
136:
137: /**
138: * Selector pool, for blocking reads and blocking writes
139: */
140: protected NioSelectorPool pool;
141:
142: /**
143: * Underlying output buffer.
144: */
145: protected OutputBuffer outputStreamOutputBuffer;
146:
147: /**
148: * Filter library.
149: * Note: Filter[0] is always the "chunked" filter.
150: */
151: protected OutputFilter[] filterLibrary;
152:
153: /**
154: * Active filter (which is actually the top of the pipeline).
155: */
156: protected OutputFilter[] activeFilters;
157:
158: /**
159: * Index of the last active filter.
160: */
161: protected int lastActiveFilter;
162:
163: /**
164: * Write time out in milliseconds
165: */
166: protected long writeTimeout = -1;
167:
168: // ------------------------------------------------------------- Properties
169:
170: /**
171: * Set the underlying socket.
172: */
173: public void setSocket(NioChannel socket) {
174: this .socket = socket;
175: }
176:
177: public void setWriteTimeout(long writeTimeout) {
178: this .writeTimeout = writeTimeout;
179: }
180:
181: /**
182: * Get the underlying socket input stream.
183: */
184: public NioChannel getSocket() {
185: return socket;
186: }
187:
188: public long getWriteTimeout() {
189: return writeTimeout;
190: }
191:
192: public void setSelectorPool(NioSelectorPool pool) {
193: this .pool = pool;
194: }
195:
196: public NioSelectorPool getSelectorPool() {
197: return pool;
198: }
199:
200: /**
201: * Set the socket buffer size.
202: */
203: public void setSocketBuffer(int socketBufferSize) {
204: // FIXME: Remove
205: }
206:
207: /**
208: * Add an output filter to the filter library.
209: */
210: public void addFilter(OutputFilter filter) {
211:
212: OutputFilter[] newFilterLibrary = new OutputFilter[filterLibrary.length + 1];
213: for (int i = 0; i < filterLibrary.length; i++) {
214: newFilterLibrary[i] = filterLibrary[i];
215: }
216: newFilterLibrary[filterLibrary.length] = filter;
217: filterLibrary = newFilterLibrary;
218:
219: activeFilters = new OutputFilter[filterLibrary.length];
220:
221: }
222:
223: /**
224: * Get filters.
225: */
226: public OutputFilter[] getFilters() {
227:
228: return filterLibrary;
229:
230: }
231:
232: /**
233: * Clear filters.
234: */
235: public void clearFilters() {
236:
237: filterLibrary = new OutputFilter[0];
238: lastActiveFilter = -1;
239:
240: }
241:
242: /**
243: * Add an output filter to the filter library.
244: */
245: public void addActiveFilter(OutputFilter filter) {
246:
247: if (lastActiveFilter == -1) {
248: filter.setBuffer(outputStreamOutputBuffer);
249: } else {
250: for (int i = 0; i <= lastActiveFilter; i++) {
251: if (activeFilters[i] == filter)
252: return;
253: }
254: filter.setBuffer(activeFilters[lastActiveFilter]);
255: }
256:
257: activeFilters[++lastActiveFilter] = filter;
258:
259: filter.setResponse(response);
260:
261: }
262:
263: // --------------------------------------------------------- Public Methods
264:
265: /**
266: * Flush the response.
267: *
268: * @throws IOException an undelying I/O error occured
269: */
270: public void flush() throws IOException {
271:
272: if (!committed) {
273:
274: // Send the connector a request for commit. The connector should
275: // then validate the headers, send them (using sendHeader) and
276: // set the filters accordingly.
277: response.action(ActionCode.ACTION_COMMIT, null);
278:
279: }
280:
281: // Flush the current buffer
282: flushBuffer();
283:
284: }
285:
286: /**
287: * Reset current response.
288: *
289: * @throws IllegalStateException if the response has already been committed
290: */
291: public void reset() {
292:
293: if (committed)
294: throw new IllegalStateException(/*FIXME:Put an error message*/);
295:
296: // Recycle Request object
297: response.recycle();
298:
299: }
300:
301: /**
302: * Recycle the output buffer. This should be called when closing the
303: * connection.
304: */
305: public void recycle() {
306:
307: // Recycle Request object
308: response.recycle();
309: socket.getBufHandler().getWriteBuffer().clear();
310:
311: socket = null;
312: pos = 0;
313: lastActiveFilter = -1;
314: committed = false;
315: finished = false;
316:
317: }
318:
319: /**
320: * End processing of current HTTP request.
321: * Note: All bytes of the current request should have been already
322: * consumed. This method only resets all the pointers so that we are ready
323: * to parse the next HTTP request.
324: */
325: public void nextRequest() {
326:
327: // Recycle Request object
328: response.recycle();
329:
330: // Recycle filters
331: for (int i = 0; i <= lastActiveFilter; i++) {
332: activeFilters[i].recycle();
333: }
334:
335: // Reset pointers
336: pos = 0;
337: lastActiveFilter = -1;
338: committed = false;
339: finished = false;
340:
341: }
342:
343: /**
344: * End request.
345: *
346: * @throws IOException an undelying I/O error occured
347: */
348: public void endRequest() throws IOException {
349:
350: if (!committed) {
351:
352: // Send the connector a request for commit. The connector should
353: // then validate the headers, send them (using sendHeader) and
354: // set the filters accordingly.
355: response.action(ActionCode.ACTION_COMMIT, null);
356:
357: }
358:
359: if (finished)
360: return;
361:
362: if (lastActiveFilter != -1)
363: activeFilters[lastActiveFilter].end();
364:
365: flushBuffer();
366:
367: finished = true;
368:
369: }
370:
371: // ------------------------------------------------ HTTP/1.1 Output Methods
372:
373: /**
374: * Send an acknoledgement.
375: */
376: public void sendAck() throws IOException {
377:
378: if (!committed) {
379: //Socket.send(socket, Constants.ACK_BYTES, 0, Constants.ACK_BYTES.length) < 0
380: ByteBuffer buf = ByteBuffer.wrap(Constants.ACK_BYTES, 0,
381: Constants.ACK_BYTES.length);
382: writeToSocket(buf, false);
383: }
384:
385: }
386:
387: private synchronized void writeToSocket(ByteBuffer bytebuffer,
388: boolean flip) throws IOException {
389: //int limit = bytebuffer.position();
390: if (flip)
391: bytebuffer.flip();
392: int written = 0;
393: Selector selector = null;
394: try {
395: selector = getSelectorPool().get();
396: } catch (IOException x) {
397: //ignore
398: }
399: try {
400: written = getSelectorPool().write(bytebuffer, socket,
401: selector, writeTimeout);
402: //make sure we are flushed
403: do {
404: if (socket.flush(true, selector, writeTimeout))
405: break;
406: } while (true);
407: } finally {
408: if (selector != null)
409: getSelectorPool().put(selector);
410: }
411: socket.getBufHandler().getWriteBuffer().clear();
412: this .total = 0;
413: }
414:
415: /**
416: * Send the response status line.
417: */
418: public void sendStatus() {
419:
420: // Write protocol name
421: write(Constants.HTTP_11_BYTES);
422: buf[pos++] = Constants.SP;
423:
424: // Write status code
425: int status = response.getStatus();
426: switch (status) {
427: case 200:
428: write(Constants._200_BYTES);
429: break;
430: case 400:
431: write(Constants._400_BYTES);
432: break;
433: case 404:
434: write(Constants._404_BYTES);
435: break;
436: default:
437: write(status);
438: }
439:
440: buf[pos++] = Constants.SP;
441:
442: // Write message
443: String message = response.getMessage();
444: if (message == null) {
445: write(HttpMessages.getMessage(status));
446: } else {
447: write(message);
448: }
449:
450: // End the response status line
451: buf[pos++] = Constants.CR;
452: buf[pos++] = Constants.LF;
453:
454: }
455:
456: /**
457: * Send a header.
458: *
459: * @param name Header name
460: * @param value Header value
461: */
462: public void sendHeader(MessageBytes name, MessageBytes value) {
463:
464: write(name);
465: buf[pos++] = Constants.COLON;
466: buf[pos++] = Constants.SP;
467: write(value);
468: buf[pos++] = Constants.CR;
469: buf[pos++] = Constants.LF;
470:
471: }
472:
473: /**
474: * Send a header.
475: *
476: * @param name Header name
477: * @param value Header value
478: */
479: public void sendHeader(ByteChunk name, ByteChunk value) {
480:
481: write(name);
482: buf[pos++] = Constants.COLON;
483: buf[pos++] = Constants.SP;
484: write(value);
485: buf[pos++] = Constants.CR;
486: buf[pos++] = Constants.LF;
487:
488: }
489:
490: /**
491: * Send a header.
492: *
493: * @param name Header name
494: * @param value Header value
495: */
496: public void sendHeader(String name, String value) {
497:
498: write(name);
499: buf[pos++] = Constants.COLON;
500: buf[pos++] = Constants.SP;
501: write(value);
502: buf[pos++] = Constants.CR;
503: buf[pos++] = Constants.LF;
504:
505: }
506:
507: /**
508: * End the header block.
509: */
510: public void endHeaders() {
511:
512: buf[pos++] = Constants.CR;
513: buf[pos++] = Constants.LF;
514:
515: }
516:
517: // --------------------------------------------------- OutputBuffer Methods
518:
519: /**
520: * Write the contents of a byte chunk.
521: *
522: * @param chunk byte chunk
523: * @return number of bytes written
524: * @throws IOException an undelying I/O error occured
525: */
526: public int doWrite(ByteChunk chunk, Response res)
527: throws IOException {
528:
529: if (!committed) {
530:
531: // Send the connector a request for commit. The connector should
532: // then validate the headers, send them (using sendHeaders) and
533: // set the filters accordingly.
534: response.action(ActionCode.ACTION_COMMIT, null);
535:
536: }
537:
538: if (lastActiveFilter == -1)
539: return outputStreamOutputBuffer.doWrite(chunk, res);
540: else
541: return activeFilters[lastActiveFilter].doWrite(chunk, res);
542:
543: }
544:
545: // ------------------------------------------------------ Protected Methods
546:
547: /**
548: * Commit the response.
549: *
550: * @throws IOException an undelying I/O error occured
551: */
552: protected void commit() throws IOException {
553:
554: // The response is now committed
555: committed = true;
556: response.setCommitted(true);
557:
558: if (pos > 0) {
559: // Sending the response header buffer
560: addToBB(buf, 0, pos);
561: }
562:
563: }
564:
565: int total = 0;
566:
567: private synchronized void addToBB(byte[] buf, int offset, int length)
568: throws IOException {
569: while (socket.getBufHandler().getWriteBuffer().remaining() < length) {
570: flushBuffer();
571: }
572: socket.getBufHandler().getWriteBuffer()
573: .put(buf, offset, length);
574: total += length;
575: NioEndpoint.KeyAttachment ka = (NioEndpoint.KeyAttachment) socket
576: .getAttachment(false);
577: if (ka != null)
578: ka.access();//prevent timeouts for just doing client writes
579: }
580:
581: /**
582: * This method will write the contents of the specyfied message bytes
583: * buffer to the output stream, without filtering. This method is meant to
584: * be used to write the response header.
585: *
586: * @param mb data to be written
587: */
588: protected void write(MessageBytes mb) {
589:
590: if (mb.getType() == MessageBytes.T_BYTES) {
591: ByteChunk bc = mb.getByteChunk();
592: write(bc);
593: } else if (mb.getType() == MessageBytes.T_CHARS) {
594: CharChunk cc = mb.getCharChunk();
595: write(cc);
596: } else {
597: write(mb.toString());
598: }
599:
600: }
601:
602: /**
603: * This method will write the contents of the specyfied message bytes
604: * buffer to the output stream, without filtering. This method is meant to
605: * be used to write the response header.
606: *
607: * @param bc data to be written
608: */
609: protected void write(ByteChunk bc) {
610:
611: // Writing the byte chunk to the output buffer
612: int length = bc.getLength();
613: System
614: .arraycopy(bc.getBytes(), bc.getStart(), buf, pos,
615: length);
616: pos = pos + length;
617:
618: }
619:
620: /**
621: * This method will write the contents of the specyfied char
622: * buffer to the output stream, without filtering. This method is meant to
623: * be used to write the response header.
624: *
625: * @param cc data to be written
626: */
627: protected void write(CharChunk cc) {
628:
629: int start = cc.getStart();
630: int end = cc.getEnd();
631: char[] cbuf = cc.getBuffer();
632: for (int i = start; i < end; i++) {
633: char c = cbuf[i];
634: // Note: This is clearly incorrect for many strings,
635: // but is the only consistent approach within the current
636: // servlet framework. It must suffice until servlet output
637: // streams properly encode their output.
638: if ((c <= 31) && (c != 9)) {
639: c = ' ';
640: } else if (c == 127) {
641: c = ' ';
642: }
643: buf[pos++] = (byte) c;
644: }
645:
646: }
647:
648: /**
649: * This method will write the contents of the specyfied byte
650: * buffer to the output stream, without filtering. This method is meant to
651: * be used to write the response header.
652: *
653: * @param b data to be written
654: */
655: public void write(byte[] b) {
656:
657: // Writing the byte chunk to the output buffer
658: System.arraycopy(b, 0, buf, pos, b.length);
659: pos = pos + b.length;
660:
661: }
662:
663: /**
664: * This method will write the contents of the specyfied String to the
665: * output stream, without filtering. This method is meant to be used to
666: * write the response header.
667: *
668: * @param s data to be written
669: */
670: protected void write(String s) {
671:
672: if (s == null)
673: return;
674:
675: // From the Tomcat 3.3 HTTP/1.0 connector
676: int len = s.length();
677: for (int i = 0; i < len; i++) {
678: char c = s.charAt(i);
679: // Note: This is clearly incorrect for many strings,
680: // but is the only consistent approach within the current
681: // servlet framework. It must suffice until servlet output
682: // streams properly encode their output.
683: if ((c <= 31) && (c != 9)) {
684: c = ' ';
685: } else if (c == 127) {
686: c = ' ';
687: }
688: buf[pos++] = (byte) c;
689: }
690:
691: }
692:
693: /**
694: * This method will print the specified integer to the output stream,
695: * without filtering. This method is meant to be used to write the
696: * response header.
697: *
698: * @param i data to be written
699: */
700: protected void write(int i) {
701:
702: write(String.valueOf(i));
703:
704: }
705:
706: /**
707: * Callback to write data from the buffer.
708: */
709: protected void flushBuffer() throws IOException {
710:
711: //prevent timeout for async,
712: SelectionKey key = socket.getIOChannel().keyFor(
713: socket.getPoller().getSelector());
714: if (key != null) {
715: NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key
716: .attachment();
717: attach.access();
718: }
719:
720: //write to the socket, if there is anything to write
721: if (socket.getBufHandler().getWriteBuffer().position() > 0) {
722: writeToSocket(socket.getBufHandler().getWriteBuffer(), true);
723: }
724: }
725:
726: // ----------------------------------- OutputStreamOutputBuffer Inner Class
727:
728: /**
729: * This class is an output buffer which will write data to an output
730: * stream.
731: */
732: protected class SocketOutputBuffer implements OutputBuffer {
733:
734: /**
735: * Write chunk.
736: */
737: public int doWrite(ByteChunk chunk, Response res)
738: throws IOException {
739:
740: int len = chunk.getLength();
741: int start = chunk.getStart();
742: byte[] b = chunk.getBuffer();
743: while (len > 0) {
744: int thisTime = len;
745: if (socket.getBufHandler().getWriteBuffer().position() == socket
746: .getBufHandler().getWriteBuffer().capacity()) {
747: flushBuffer();
748: }
749: if (thisTime > socket.getBufHandler().getWriteBuffer()
750: .remaining()) {
751: thisTime = socket.getBufHandler().getWriteBuffer()
752: .remaining();
753: }
754: addToBB(b, start, thisTime);
755: len = len - thisTime;
756: start = start + thisTime;
757: }
758: return chunk.getLength();
759:
760: }
761:
762: }
763:
764: }
|