001: package org.jvnet.mimepull;
002:
003: import java.io.InputStream;
004: import java.io.IOException;
005: import java.nio.ByteBuffer;
006:
007: /**
008: * Represents an attachment part in a MIME message. MIME message parsing is done
009: * lazily using a pull parser, so the part may not have all the data. {@link #read}
010: * and {@link #readOnce} may trigger the actual parsing the message. In fact,
011: * parsing of an attachment part may be triggered by calling {@link #read} methods
012: * on some other attachemnt parts. All this happens behind the scenes so the
013: * application developer need not worry about these details.
014: *
015: * @author Jitendra Kotamraju
016: */
017: final class DataHead {
018:
019: /**
020: * Linked list to keep the part's content
021: */
022: volatile Chunk head, tail;
023:
024: /**
025: * If the part is stored in a file, non-null.
026: *
027: * If head is non-null, then we have the whole part in the file,
028: * otherwise the file is only partial.
029: */
030: DataFile dataFile;
031:
032: private final MIMEPart part;
033: private volatile int activeReads = 0; // TODO sync + and -
034:
035: boolean readOnce;
036: volatile long inMemory; // TODO sync + and -
037:
038: /**
039: * Used only for debugging. This records where readOnce() is called.
040: */
041: private Throwable consumedAt;
042:
043: DataHead(MIMEPart part) {
044: this .part = part;
045: }
046:
047: void addBody(ByteBuffer buf) {
048: synchronized (this ) {
049: inMemory += buf.limit();
050: }
051:
052: if (tail != null) {
053: tail = tail.createNext(this , buf);
054: } else {
055: head = tail = new Chunk(
056: new MemoryData(buf, part.msg.config));
057: }
058: }
059:
060: void doneParsing() {
061: if (activeReads == 0 && dataFile != null) {
062: head = tail = null;
063: dataFile.close();
064: }
065: }
066:
067: void close() {
068: if (dataFile != null) {
069: head = tail = null;
070: dataFile.close();
071: }
072: }
073:
074: /**
075: * Can get the attachment part's content multiple times. That means
076: * the full content needs to be there in memory or on the file system.
077: * Calling this method would trigger parsing for the part's data. So
078: * do not call this unless it is required(otherwise, just wrap MIMEPart
079: * into a object that returns InputStream for e.g DataHandler)
080: *
081: * @return data for the part's content
082: */
083: public InputStream read() {
084: if (readOnce) {
085: throw new IllegalStateException(
086: "readOnce() is called before, read() cannot be called later.");
087: }
088: // Have the complete data on the file system
089: if (part.parsed && dataFile != null && activeReads == 0) {
090: return dataFile.getInputStream();
091: }
092:
093: // Trigger parsing for the part
094: while (tail == null) {
095: if (!part.msg.makeProgress()) {
096: throw new IllegalStateException("No such Part: " + part);
097: }
098: }
099:
100: if (head == null) {
101: throw new IllegalStateException(
102: "Already read. Probably readOnce() is called before.");
103: }
104: synchronized (this ) {
105: ++activeReads;
106: }
107: return new ReadMultiStream();
108: }
109:
110: /**
111: * Used for an assertion. Returns true when readOnce() is not already called.
112: * or otherwise throw an exception.
113: *
114: * <p>
115: * Calling this method also marks the stream as 'consumed'
116: *
117: * @return true if readOnce() is not called before
118: */
119: private boolean unconsumed() {
120: if (consumedAt != null) {
121: AssertionError error = new AssertionError(
122: "readOnce() is already called before. See the nested exception from where it's called.");
123: error.initCause(consumedAt);
124: throw error;
125: }
126: consumedAt = new Exception().fillInStackTrace();
127: return true;
128: }
129:
130: /**
131: * Can get the attachment part's content only once. The content
132: * will be lost after the method. Content data is not be stored
133: * on the file system or is not kept in the memory for the
134: * following case:
135: * - Attachement parts contents are accessed sequentially
136: *
137: * In general, take advantage of this when the data is used only
138: * once.
139: *
140: * @return data for the part's content
141: */
142: public InputStream readOnce() {
143: assert unconsumed();
144: if (readOnce) {
145: throw new IllegalStateException(
146: "readOnce() is called before. It can only be called once.");
147: }
148: readOnce = true;
149: if (part.parsed && dataFile != null && activeReads == 0) {
150: return dataFile.getInputStream();
151: }
152: // Trigger parsing for the part
153: while (tail == null) {
154: if (!part.msg.makeProgress() && tail == null) {
155: throw new IllegalStateException("No such Part: " + part);
156: }
157: }
158: synchronized (this ) {
159: ++activeReads;
160: }
161: InputStream in = new ReadOnceStream();
162: head = null;
163: return in;
164: }
165:
166: class ReadMultiStream extends InputStream {
167: Chunk current;
168: int offset;
169: int len;
170: byte[] buf;
171:
172: public ReadMultiStream() {
173: this .current = head;
174: len = current.data.size();
175: buf = current.data.read();
176: }
177:
178: @Override
179: public int read(byte b[], int off, int sz) throws IOException {
180: if (!fetch())
181: return -1;
182:
183: sz = Math.min(sz, len - offset);
184: System.arraycopy(buf, offset, b, off, sz);
185: offset += sz;
186: return sz;
187: }
188:
189: public int read() throws IOException {
190: if (!fetch()) {
191: synchronized (this ) {
192: --activeReads;
193: }
194: return -1;
195: }
196: return (buf[offset++] & 0xff);
197: }
198:
199: void adjustInMemoryUsage() {
200: // Nothing to do in this case.
201: }
202:
203: /**
204: * Gets to the next chunk if we are done with the current one.
205: * @return
206: */
207: private boolean fetch() {
208: if (current == null) {
209: throw new IllegalStateException("Stream already closed");
210: }
211: while (offset == len) {
212: while (!part.parsed && current.next == null) {
213: part.msg.makeProgress();
214: }
215: current = current.next;
216:
217: if (current == null) {
218: return false;
219: }
220: adjustInMemoryUsage();
221: this .offset = 0;
222: this .buf = current.data.read();
223: this .len = current.data.size();
224: }
225: return true;
226: }
227:
228: public void close() throws IOException {
229: super .close();
230: current = null;
231: }
232: }
233:
234: final class ReadOnceStream extends ReadMultiStream {
235:
236: @Override
237: void adjustInMemoryUsage() {
238: synchronized (DataHead.this ) {
239: inMemory -= current.data.size(); // adjust current memory usage
240: }
241: }
242:
243: }
244:
245: }
|