001: /*
002: * $HeadURL:https://svn.apache.org/repos/asf/jakarta/httpcomponents/httpcore/trunk/module-nio/src/main/java/org/apache/http/nio/buffer/ContentInputBuffer.java $
003: * $Revision:473999 $
004: * $Date:2006-11-12 17:31:38 +0000 (Sun, 12 Nov 2006) $
005: *
006: * ====================================================================
007: * Licensed to the Apache Software Foundation (ASF) under one
008: * or more contributor license agreements. See the NOTICE file
009: * distributed with this work for additional information
010: * regarding copyright ownership. The ASF licenses this file
011: * to you under the Apache License, Version 2.0 (the
012: * "License"); you may not use this file except in compliance
013: * with the License. You may obtain a copy of the License at
014: *
015: * http://www.apache.org/licenses/LICENSE-2.0
016: *
017: * Unless required by applicable law or agreed to in writing,
018: * software distributed under the License is distributed on an
019: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
020: * KIND, either express or implied. See the License for the
021: * specific language governing permissions and limitations
022: * under the License.
023: * ====================================================================
024: *
025: * This software consists of voluntary contributions made by many
026: * individuals on behalf of the Apache Software Foundation. For more
027: * information on the Apache Software Foundation, please see
028: * <http://www.apache.org/>.
029: *
030: */
031: package org.apache.http.nio.util;
032:
033: import java.io.IOException;
034: import java.io.InterruptedIOException;
035:
036: import org.apache.http.nio.ContentDecoder;
037: import org.apache.http.nio.IOControl;
038:
039: public class SharedInputBuffer extends ExpandableBuffer implements
040: ContentInputBuffer {
041:
042: private final IOControl ioctrl;
043: private final Object mutex;
044:
045: private volatile boolean shutdown = false;
046: private volatile boolean endOfStream = false;
047:
048: public SharedInputBuffer(int buffersize, final IOControl ioctrl,
049: final ByteBufferAllocator allocator) {
050: super (buffersize, allocator);
051: if (ioctrl == null) {
052: throw new IllegalArgumentException(
053: "I/O content control may not be null");
054: }
055: this .ioctrl = ioctrl;
056: this .mutex = new Object();
057: }
058:
059: public void reset() {
060: if (this .shutdown) {
061: return;
062: }
063: synchronized (this .mutex) {
064: clear();
065: this .endOfStream = false;
066: }
067: }
068:
069: public int consumeContent(final ContentDecoder decoder)
070: throws IOException {
071: if (this .shutdown) {
072: return -1;
073: }
074: synchronized (this .mutex) {
075: setInputMode();
076: int totalRead = 0;
077: int bytesRead;
078: while ((bytesRead = decoder.read(this .buffer)) > 0) {
079: totalRead += bytesRead;
080: }
081: if (bytesRead == -1 || decoder.isCompleted()) {
082: this .endOfStream = true;
083: }
084: if (totalRead > 0) {
085: this .ioctrl.suspendInput();
086: }
087: this .mutex.notifyAll();
088:
089: if (totalRead > 0) {
090: return totalRead;
091: } else {
092: if (this .endOfStream) {
093: return -1;
094: } else {
095: return 0;
096: }
097: }
098: }
099: }
100:
101: protected void waitForData() throws IOException {
102: synchronized (this .mutex) {
103: try {
104: while (!hasData() && !this .endOfStream) {
105: if (this .shutdown) {
106: throw new InterruptedIOException(
107: "Input operation aborted");
108: }
109: this .ioctrl.requestInput();
110: this .mutex.wait();
111: }
112: } catch (InterruptedException ex) {
113: throw new IOException(
114: "Interrupted while waiting for more data");
115: }
116: }
117: }
118:
119: public void shutdown() {
120: if (this .shutdown) {
121: return;
122: }
123: this .shutdown = true;
124: synchronized (this .mutex) {
125: this .mutex.notifyAll();
126: }
127: }
128:
129: protected boolean isShutdown() {
130: return this .shutdown;
131: }
132:
133: protected boolean isEndOfStream() {
134: return this .shutdown || (!hasData() && this .endOfStream);
135: }
136:
137: public int read() throws IOException {
138: if (this .shutdown) {
139: return -1;
140: }
141: synchronized (this .mutex) {
142: if (!hasData()) {
143: waitForData();
144: }
145: if (isEndOfStream()) {
146: return -1;
147: }
148: return this .buffer.get() & 0xff;
149: }
150: }
151:
152: public int read(final byte[] b, int off, int len)
153: throws IOException {
154: if (this .shutdown) {
155: return -1;
156: }
157: if (b == null) {
158: return 0;
159: }
160: synchronized (this .mutex) {
161: if (!hasData()) {
162: waitForData();
163: }
164: if (isEndOfStream()) {
165: return -1;
166: }
167: setOutputMode();
168: int chunk = len;
169: if (chunk > this .buffer.remaining()) {
170: chunk = this .buffer.remaining();
171: }
172: this .buffer.get(b, off, chunk);
173: return chunk;
174: }
175: }
176:
177: public int read(final byte[] b) throws IOException {
178: if (this .shutdown) {
179: return -1;
180: }
181: if (b == null) {
182: return 0;
183: }
184: return read(b, 0, b.length);
185: }
186:
187: }
|