001: package org.apache.james.mailrepository;
002:
003: import org.apache.avalon.cornerstone.services.store.StreamRepository;
004: import org.apache.james.core.MimeMessageUtil;
005: import org.apache.mailet.Mail;
006:
007: import javax.mail.MessagingException;
008:
009: import java.io.ByteArrayInputStream;
010: import java.io.ByteArrayOutputStream;
011: import java.io.IOException;
012: import java.io.InputStream;
013: import java.io.OutputStream;
014: import java.io.PipedInputStream;
015: import java.io.PipedOutputStream;
016:
017: /**
018: * This class provides an inputStream for a Mail object.
019: * If the Mail is larger than 4KB it uses Piped streams and a worker threads
020: * Otherwise it simply create a temporary byte buffer and does not create
021: * the worker thread.
022: *
023: * Note: Javamail (or the Activation Framework) already uses a worker threads when
024: * asked for an inputstream.
025: */
026: final class MessageInputStream extends InputStream {
027:
028: /**
029: * The size of the current message
030: */
031: private long size = -1;
032: /**
033: * The wrapped stream (Piped or Binary)
034: */
035: private InputStream wrapped;
036: /**
037: * If an excaption happens in the worker threads it's stored here
038: */
039: private Exception caughtException;
040: /**
041: * Stream repository used for dbfiles (null otherwise)
042: */
043: private StreamRepository streamRep;
044:
045: /**
046: * Main constructor. If srep is not null than we are using dbfiles and we stream
047: * the body to file and only the header to db.
048: */
049: public MessageInputStream(Mail mc, StreamRepository srep,
050: int sizeLimit) throws IOException, MessagingException {
051: super ();
052: caughtException = null;
053: streamRep = srep;
054: size = mc.getMessageSize();
055: // we use the pipes only when streamRep is null and the message size is greater than 4096
056: // Otherwise we should calculate the header size and not the message size when streamRep is not null (JAMES-475)
057: if (streamRep == null && size > sizeLimit) {
058: PipedOutputStream headerOut = new PipedOutputStream();
059: new Thread() {
060: private Mail mail;
061:
062: private PipedOutputStream out;
063:
064: public void run() {
065: try {
066: writeStream(mail, out);
067: } catch (IOException e) {
068: caughtException = e;
069: } catch (MessagingException e) {
070: caughtException = e;
071: }
072: }
073:
074: public Thread setParam(Mail mc,
075: PipedOutputStream headerOut) {
076: this .mail = mc;
077: this .out = headerOut;
078: return this ;
079: }
080: }.setParam(mc, (PipedOutputStream) headerOut).start();
081: wrapped = new PipedInputStream(headerOut);
082: } else {
083: ByteArrayOutputStream headerOut = new ByteArrayOutputStream();
084: writeStream(mc, headerOut);
085: wrapped = new ByteArrayInputStream(headerOut.toByteArray());
086: size = headerOut.size();
087: }
088: }
089:
090: /**
091: * Returns the size of the full message
092: */
093: public long getSize() {
094: return size;
095: }
096:
097: /**
098: * write the full mail to the stream
099: * This can be used by this object or by the worker threads.
100: */
101: private void writeStream(Mail mail, OutputStream out)
102: throws IOException, MessagingException {
103: OutputStream bodyOut = null;
104: try {
105: if (streamRep == null) {
106: //If there is no filestore, use the byte array to store headers
107: // and the body
108: bodyOut = out;
109: } else {
110: //Store the body in the stream repository
111: bodyOut = streamRep.put(mail.getName());
112: }
113:
114: //Write the message to the headerOut and bodyOut. bodyOut goes straight to the file
115: MimeMessageUtil.writeTo(mail.getMessage(), out, bodyOut);
116: out.flush();
117: bodyOut.flush();
118:
119: } finally {
120: closeOutputStreams(out, bodyOut);
121: }
122: }
123:
124: private void throwException() throws IOException {
125: try {
126: if (wrapped == null) {
127: throw new IOException(
128: "wrapped stream does not exists anymore");
129: } else if (caughtException instanceof IOException) {
130: throw (IOException) caughtException;
131: } else {
132: throw new IOException(
133: "Exception caugth in worker thread "
134: + caughtException.getMessage()) {
135: /**
136: * @see java.lang.Throwable#getCause()
137: */
138: public Throwable getCause() {
139: return caughtException;
140: }
141: };
142: }
143: } finally {
144: caughtException = null;
145: wrapped = null;
146: }
147: }
148:
149: /**
150: * Closes output streams used to update message
151: *
152: * @param headerStream the stream containing header information - potentially the same
153: * as the body stream
154: * @param bodyStream the stream containing body information
155: * @throws IOException
156: */
157: private void closeOutputStreams(OutputStream headerStream,
158: OutputStream bodyStream) throws IOException {
159: try {
160: // If the header stream is not the same as the body stream,
161: // close the header stream here.
162: if ((headerStream != null) && (headerStream != bodyStream)) {
163: headerStream.close();
164: }
165: } finally {
166: if (bodyStream != null) {
167: bodyStream.close();
168: }
169: }
170: }
171:
172: // wrapper methods
173:
174: /**
175: * @see java.io.InputStream#available()
176: */
177: public int available() throws IOException {
178: if (caughtException != null || wrapped == null) {
179: throwException();
180: }
181: return wrapped.available();
182: }
183:
184: /**
185: * @see java.io.Closeable#close()
186: */
187: public void close() throws IOException {
188: if (caughtException != null || wrapped == null) {
189: throwException();
190: }
191: wrapped.close();
192: wrapped = null;
193: }
194:
195: /**
196: * @see java.io.InputStream#mark(int)
197: */
198: public synchronized void mark(int arg0) {
199: wrapped.mark(arg0);
200: }
201:
202: /**
203: * @see java.io.InputStream#markSupported()
204: */
205: public boolean markSupported() {
206: return wrapped.markSupported();
207: }
208:
209: /**
210: * @see java.io.InputStream#read(byte[], int, int)
211: */
212: public int read(byte[] arg0, int arg1, int arg2) throws IOException {
213: if (caughtException != null || wrapped == null) {
214: throwException();
215: }
216: return wrapped.read(arg0, arg1, arg2);
217: }
218:
219: /**
220: * @see java.io.InputStream#read(byte[])
221: */
222: public int read(byte[] arg0) throws IOException {
223: if (caughtException != null || wrapped == null) {
224: throwException();
225: }
226: return wrapped.read(arg0);
227: }
228:
229: /**
230: * @see java.io.InputStream#reset()
231: */
232: public synchronized void reset() throws IOException {
233: if (caughtException != null || wrapped == null) {
234: throwException();
235: }
236: wrapped.reset();
237: }
238:
239: /**
240: * @see java.io.InputStream#skip(long)
241: */
242: public long skip(long arg0) throws IOException {
243: if (caughtException != null || wrapped == null) {
244: throwException();
245: }
246: return wrapped.skip(arg0);
247: }
248:
249: /**
250: * @see java.io.InputStream#read()
251: */
252: public int read() throws IOException {
253: if (caughtException != null || wrapped == null) {
254: throwException();
255: }
256: return wrapped.read();
257: }
258:
259: }
|