001: /*
002: * $HeadURL:https://svn.apache.org/repos/asf/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/buffer/ContentOutputBuffer.java $
003: * $Revision:473999 $
004: * $Date:2006-11-12 17:31:38 +0000 (Sun, 12 Nov 2006) $
005: *
006: * ====================================================================
007: * Licensed to the Apache Software Foundation (ASF) under one
008: * or more contributor license agreements. See the NOTICE file
009: * distributed with this work for additional information
010: * regarding copyright ownership. The ASF licenses this file
011: * to you under the Apache License, Version 2.0 (the
012: * "License"); you may not use this file except in compliance
013: * with the License. You may obtain a copy of the License at
014: *
015: * http://www.apache.org/licenses/LICENSE-2.0
016: *
017: * Unless required by applicable law or agreed to in writing,
018: * software distributed under the License is distributed on an
019: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
020: * KIND, either express or implied. See the License for the
021: * specific language governing permissions and limitations
022: * under the License.
023: * ====================================================================
024: *
025: * This software consists of voluntary contributions made by many
026: * individuals on behalf of the Apache Software Foundation. For more
027: * information on the Apache Software Foundation, please see
028: * <http://www.apache.org/>.
029: *
030: */
031: package org.apache.http.nio.util;
032:
033: import java.io.IOException;
034: import java.io.InterruptedIOException;
035:
036: import org.apache.http.nio.ContentEncoder;
037: import org.apache.http.nio.IOControl;
038:
039: public class SharedOutputBuffer extends ExpandableBuffer implements
040: ContentOutputBuffer {
041:
042: private final IOControl ioctrl;
043: private final Object mutex;
044:
045: private volatile boolean shutdown = false;
046: private volatile boolean endOfStream;
047:
048: public SharedOutputBuffer(int buffersize, final IOControl ioctrl,
049: final ByteBufferAllocator allocator) {
050: super (buffersize, allocator);
051: if (ioctrl == null) {
052: throw new IllegalArgumentException(
053: "I/O content control may not be null");
054: }
055: this .ioctrl = ioctrl;
056: this .mutex = new Object();
057: this .endOfStream = false;
058: }
059:
060: public void reset() {
061: if (this .shutdown) {
062: return;
063: }
064: synchronized (this .mutex) {
065: clear();
066: this .endOfStream = false;
067: }
068: }
069:
070: public int produceContent(final ContentEncoder encoder)
071: throws IOException {
072: if (this .shutdown) {
073: return -1;
074: }
075: synchronized (this .mutex) {
076: setOutputMode();
077: int bytesWritten = 0;
078: if (hasData()) {
079: bytesWritten = encoder.write(this .buffer);
080: if (encoder.isCompleted()) {
081: this .endOfStream = true;
082: }
083: }
084: if (!hasData()) {
085: // No more buffered content
086: // If at the end of the stream, terminate
087: if (this .endOfStream && !encoder.isCompleted()) {
088: encoder.complete();
089: }
090: if (!this .endOfStream) {
091: // suspend output events
092: this .ioctrl.suspendOutput();
093: }
094: }
095: this .mutex.notifyAll();
096: return bytesWritten;
097: }
098: }
099:
100: public void shutdown() {
101: if (this .shutdown) {
102: return;
103: }
104: this .shutdown = true;
105: synchronized (this .mutex) {
106: this .mutex.notifyAll();
107: }
108: }
109:
110: public void write(final byte[] b, int off, int len)
111: throws IOException {
112: if (b == null) {
113: return;
114: }
115: synchronized (this .mutex) {
116: if (this .shutdown || this .endOfStream) {
117: throw new IllegalStateException(
118: "Buffer already closed for writing");
119: }
120: setInputMode();
121: int remaining = len;
122: while (remaining > 0) {
123: if (!this .buffer.hasRemaining()) {
124: flushContent();
125: setInputMode();
126: }
127: int chunk = Math
128: .min(remaining, this .buffer.remaining());
129: this .buffer.put(b, off, chunk);
130: remaining -= chunk;
131: off += chunk;
132: }
133: }
134: }
135:
136: public void write(final byte[] b) throws IOException {
137: if (b == null) {
138: return;
139: }
140: write(b, 0, b.length);
141: }
142:
143: public void write(int b) throws IOException {
144: synchronized (this .mutex) {
145: if (this .shutdown || this .endOfStream) {
146: throw new IllegalStateException(
147: "Buffer already closed for writing");
148: }
149: setInputMode();
150: if (!this .buffer.hasRemaining()) {
151: flushContent();
152: setInputMode();
153: }
154: this .buffer.put((byte) b);
155: }
156: }
157:
158: public void flush() throws IOException {
159: }
160:
161: private void flushContent() throws IOException {
162: synchronized (this .mutex) {
163: try {
164: while (hasData()) {
165: if (this .shutdown) {
166: throw new InterruptedIOException(
167: "Output operation aborted");
168: }
169: this .ioctrl.requestOutput();
170: this .mutex.wait();
171: }
172: } catch (InterruptedException ex) {
173: throw new IOException(
174: "Interrupted while flushing the content buffer");
175: }
176: }
177: }
178:
179: public void writeCompleted() throws IOException {
180: if (this .endOfStream) {
181: return;
182: }
183: this .endOfStream = true;
184: this.ioctrl.requestOutput();
185: }
186:
187: }
|