001: /****************************************************************
002: * Licensed to the Apache Software Foundation (ASF) under one *
003: * or more contributor license agreements. See the NOTICE file *
004: * distributed with this work for additional information *
005: * regarding copyright ownership. The ASF licenses this file *
006: * to you under the Apache License, Version 2.0 (the *
007: * "License"); you may not use this file except in compliance *
008: * with the License. You may obtain a copy of the License at *
009: * *
010: * http://www.apache.org/licenses/LICENSE-2.0 *
011: * *
012: * Unless required by applicable law or agreed to in writing, *
013: * software distributed under the License is distributed on an *
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
015: * KIND, either express or implied. See the License for the *
016: * specific language governing permissions and limitations *
017: * under the License. *
018: ****************************************************************/package org.apache.james.mailrepository;
019:
020: import org.apache.avalon.cornerstone.services.store.ObjectRepository;
021: import org.apache.avalon.cornerstone.services.store.Store;
022: import org.apache.avalon.cornerstone.services.store.StreamRepository;
023: import org.apache.avalon.framework.activity.Initializable;
024: import org.apache.avalon.framework.configuration.Configurable;
025: import org.apache.avalon.framework.configuration.Configuration;
026: import org.apache.avalon.framework.configuration.ConfigurationException;
027: import org.apache.avalon.framework.configuration.DefaultConfiguration;
028: import org.apache.avalon.framework.logger.AbstractLogEnabled;
029: import org.apache.avalon.framework.service.ServiceException;
030: import org.apache.avalon.framework.service.ServiceManager;
031: import org.apache.avalon.framework.service.Serviceable;
032: import org.apache.james.core.MimeMessageCopyOnWriteProxy;
033: import org.apache.james.core.MimeMessageWrapper;
034: import org.apache.james.services.MailRepository;
035: import org.apache.james.util.Lock;
036: import org.apache.mailet.Mail;
037:
038: import javax.mail.MessagingException;
039: import javax.mail.internet.MimeMessage;
040:
041: import java.io.OutputStream;
042: import java.util.ArrayList;
043: import java.util.Collection;
044: import java.util.Collections;
045: import java.util.HashSet;
046: import java.util.Iterator;
047: import java.util.Set;
048:
049: /**
050: * Implementation of a MailRepository on a FileSystem.
051: *
052: * Requires a configuration element in the .conf.xml file of the form:
053: * <repository destinationURL="file://path-to-root-dir-for-repository"
054: * type="MAIL"
055: * model="SYNCHRONOUS"/>
056: * Requires a logger called MailRepository.
057: *
058: * @version 1.0.0, 24/04/1999
059: */
060: public class AvalonMailRepository extends AbstractLogEnabled implements
061: MailRepository, Configurable, Serviceable, Initializable {
062:
063: /**
064: * Whether 'deep debugging' is turned on.
065: */
066: protected final static boolean DEEP_DEBUG = false;
067:
068: private Lock lock;
069: private Store store;
070: private StreamRepository sr;
071: private ObjectRepository or;
072: private String destination;
073: private Set keys;
074: private boolean fifo;
075: private boolean cacheKeys; // experimental: for use with write mostly repositories such as spam and error
076:
077: /**
078: * @see org.apache.avalon.framework.service.Serviceable#compose(ServiceManager )
079: */
080: public void service(final ServiceManager componentManager)
081: throws ServiceException {
082: store = (Store) componentManager.lookup(Store.ROLE);
083: }
084:
085: /**
086: * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
087: */
088: public void configure(Configuration conf)
089: throws ConfigurationException {
090: destination = conf.getAttribute("destinationURL");
091: if (getLogger().isDebugEnabled()) {
092: getLogger().debug(
093: "AvalonMailRepository.destinationURL: "
094: + destination);
095: }
096: String checkType = conf.getAttribute("type");
097: if (!(checkType.equals("MAIL") || checkType.equals("SPOOL"))) {
098: String exceptionString = "Attempt to configure AvalonMailRepository as "
099: + checkType;
100: if (getLogger().isWarnEnabled()) {
101: getLogger().warn(exceptionString);
102: }
103: throw new ConfigurationException(exceptionString);
104: }
105: fifo = conf.getAttributeAsBoolean("FIFO", false);
106: cacheKeys = conf.getAttributeAsBoolean("CACHEKEYS", true);
107: // ignore model
108: }
109:
110: /**
111: * @see org.apache.avalon.framework.activity.Initializable#initialize()
112: */
113: public void initialize() throws Exception {
114: try {
115: //prepare Configurations for object and stream repositories
116: DefaultConfiguration objectConfiguration = new DefaultConfiguration(
117: "repository",
118: "generated:AvalonFileRepository.compose()");
119:
120: objectConfiguration.setAttribute("destinationURL",
121: destination);
122: objectConfiguration.setAttribute("type", "OBJECT");
123: objectConfiguration.setAttribute("model", "SYNCHRONOUS");
124:
125: DefaultConfiguration streamConfiguration = new DefaultConfiguration(
126: "repository",
127: "generated:AvalonFileRepository.compose()");
128:
129: streamConfiguration.setAttribute("destinationURL",
130: destination);
131: streamConfiguration.setAttribute("type", "STREAM");
132: streamConfiguration.setAttribute("model", "SYNCHRONOUS");
133:
134: sr = (StreamRepository) store.select(streamConfiguration);
135: or = (ObjectRepository) store.select(objectConfiguration);
136: lock = new Lock();
137: if (cacheKeys)
138: keys = Collections.synchronizedSet(new HashSet());
139:
140: //Finds non-matching pairs and deletes the extra files
141: HashSet streamKeys = new HashSet();
142: for (Iterator i = sr.list(); i.hasNext();) {
143: streamKeys.add(i.next());
144: }
145: HashSet objectKeys = new HashSet();
146: for (Iterator i = or.list(); i.hasNext();) {
147: objectKeys.add(i.next());
148: }
149:
150: Collection strandedStreams = (Collection) streamKeys
151: .clone();
152: strandedStreams.removeAll(objectKeys);
153: for (Iterator i = strandedStreams.iterator(); i.hasNext();) {
154: String key = (String) i.next();
155: remove(key);
156: }
157:
158: Collection strandedObjects = (Collection) objectKeys
159: .clone();
160: strandedObjects.removeAll(streamKeys);
161: for (Iterator i = strandedObjects.iterator(); i.hasNext();) {
162: String key = (String) i.next();
163: remove(key);
164: }
165:
166: if (keys != null) {
167: // Next get a list from the object repository
168: // and use that for the list of keys
169: keys.clear();
170: for (Iterator i = or.list(); i.hasNext();) {
171: keys.add(i.next());
172: }
173: }
174: if (getLogger().isDebugEnabled()) {
175: StringBuffer logBuffer = new StringBuffer(128).append(
176: this .getClass().getName()).append(
177: " created in ").append(destination);
178: getLogger().debug(logBuffer.toString());
179: }
180: } catch (Exception e) {
181: final String message = "Failed to retrieve Store component:"
182: + e.getMessage();
183: getLogger().error(message, e);
184: throw e;
185: }
186: }
187:
188: /**
189: * Releases a lock on a message identified by a key
190: *
191: * @param key the key of the message to be unlocked
192: *
193: * @return true if successfully released the lock, false otherwise
194: */
195: public boolean unlock(String key) {
196: if (lock.unlock(key)) {
197: if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
198: StringBuffer debugBuffer = new StringBuffer(256)
199: .append("Unlocked ").append(key)
200: .append(" for ").append(
201: Thread.currentThread().getName())
202: .append(" @ ").append(
203: new java.util.Date(System
204: .currentTimeMillis()));
205: getLogger().debug(debugBuffer.toString());
206: }
207: return true;
208: } else {
209: return false;
210: }
211: }
212:
213: /**
214: * Obtains a lock on a message identified by a key
215: *
216: * @param key the key of the message to be locked
217: *
218: * @return true if successfully obtained the lock, false otherwise
219: */
220: public boolean lock(String key) {
221: if (lock.lock(key)) {
222: if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
223: StringBuffer debugBuffer = new StringBuffer(256)
224: .append("Locked ").append(key).append(" for ")
225: .append(Thread.currentThread().getName())
226: .append(" @ ").append(
227: new java.util.Date(System
228: .currentTimeMillis()));
229: getLogger().debug(debugBuffer.toString());
230: }
231: // synchronized (this) {
232: // notifyAll();
233: // }
234: return true;
235: } else {
236: return false;
237: }
238: }
239:
240: /**
241: * Stores a message in this repository. Shouldn't this return the key
242: * under which it is stored?
243: *
244: * @param mc the mail message to store
245: */
246: public void store(Mail mc) throws MessagingException {
247: try {
248: String key = mc.getName();
249: //Remember whether this key was locked
250: boolean wasLocked = true;
251: synchronized (this ) {
252: wasLocked = lock.isLocked(key);
253:
254: if (!wasLocked) {
255: //If it wasn't locked, we want a lock during the store
256: lock(key);
257: }
258: }
259: try {
260: if (keys != null && !keys.contains(key)) {
261: keys.add(key);
262: }
263: boolean saveStream = true;
264:
265: MimeMessage message = mc.getMessage();
266: // if the message is a Copy on Write proxy we check the wrapped message
267: // to optimize the behaviour in case of MimeMessageWrapper
268: if (message instanceof MimeMessageCopyOnWriteProxy) {
269: MimeMessageCopyOnWriteProxy messageCow = (MimeMessageCopyOnWriteProxy) message;
270: message = messageCow.getWrappedMessage();
271: }
272: if (message instanceof MimeMessageWrapper) {
273: MimeMessageWrapper wrapper = (MimeMessageWrapper) message;
274: if (DEEP_DEBUG) {
275: System.out.println("Retrieving from: "
276: + wrapper.getSourceId());
277: StringBuffer debugBuffer = new StringBuffer(64)
278: .append("Saving to: ").append(
279: destination).append("/")
280: .append(mc.getName());
281: System.out.println(debugBuffer.toString());
282: System.out.println("Modified: "
283: + wrapper.isModified());
284: }
285: StringBuffer destinationBuffer = new StringBuffer(
286: 128).append(destination).append("/")
287: .append(mc.getName());
288: if (destinationBuffer.toString().equals(
289: wrapper.getSourceId())
290: && !wrapper.isModified()) {
291: //We're trying to save to the same place, and it's not modified... we shouldn't save.
292: //More importantly, if we try to save, we will create a 0-byte file since we're
293: //retrying to retrieve from a file we'll be overwriting.
294: saveStream = false;
295: }
296: }
297: if (saveStream) {
298: OutputStream out = null;
299: try {
300: out = sr.put(key);
301: mc.getMessage().writeTo(out);
302: } finally {
303: if (out != null)
304: out.close();
305: }
306: }
307: //Always save the header information
308: or.put(key, mc);
309: } finally {
310: if (!wasLocked) {
311: // If it wasn't locked, we need to unlock now
312: unlock(key);
313: synchronized (this ) {
314: notify();
315: }
316: }
317: }
318:
319: if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
320: StringBuffer logBuffer = new StringBuffer(64).append(
321: "Mail ").append(key).append(" stored.");
322: getLogger().debug(logBuffer.toString());
323: }
324:
325: } catch (Exception e) {
326: getLogger().error("Exception storing mail: " + e, e);
327: throw new MessagingException(
328: "Exception caught while storing Message Container: "
329: + e);
330: }
331: }
332:
333: /**
334: * Retrieves a message given a key. At the moment, keys can be obtained
335: * from list() in superinterface Store.Repository
336: *
337: * @param key the key of the message to retrieve
338: * @return the mail corresponding to this key, null if none exists
339: */
340: public Mail retrieve(String key) throws MessagingException {
341: if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
342: getLogger().debug("Retrieving mail: " + key);
343: }
344: try {
345: Mail mc = null;
346: try {
347: mc = (Mail) or.get(key);
348: } catch (RuntimeException re) {
349: StringBuffer exceptionBuffer = new StringBuffer(128);
350: if (re.getCause() instanceof Error) {
351: exceptionBuffer
352: .append(
353: "Error when retrieving mail, not deleting: ")
354: .append(re.toString());
355: } else {
356: exceptionBuffer.append(
357: "Exception retrieving mail: ").append(
358: re.toString()).append(
359: ", so we're deleting it.");
360: remove(key);
361: }
362: getLogger().warn(exceptionBuffer.toString());
363: return null;
364: }
365: MimeMessageAvalonSource source = new MimeMessageAvalonSource(
366: sr, destination, key);
367: mc.setMessage(new MimeMessageCopyOnWriteProxy(source));
368:
369: return mc;
370: } catch (Exception me) {
371: getLogger().error("Exception retrieving mail: " + me);
372: throw new MessagingException(
373: "Exception while retrieving mail: "
374: + me.getMessage());
375: }
376: }
377:
378: /**
379: * Removes a specified message
380: *
381: * @param mail the message to be removed from the repository
382: */
383: public void remove(Mail mail) throws MessagingException {
384: remove(mail.getName());
385: }
386:
387: /**
388: * Removes a Collection of mails from the repository
389: * @param mails The Collection of <code>MailImpl</code>'s to delete
390: * @throws MessagingException
391: * @since 2.2.0
392: */
393: public void remove(Collection mails) throws MessagingException {
394: Iterator delList = mails.iterator();
395: while (delList.hasNext()) {
396: remove((Mail) delList.next());
397: }
398: }
399:
400: /**
401: * Removes a message identified by key.
402: *
403: * @param key the key of the message to be removed from the repository
404: */
405: public void remove(String key) throws MessagingException {
406: if (lock(key)) {
407: try {
408: if (keys != null)
409: keys.remove(key);
410: sr.remove(key);
411: or.remove(key);
412: } finally {
413: unlock(key);
414: }
415: } else {
416: StringBuffer exceptionBuffer = new StringBuffer(64).append(
417: "Cannot lock ").append(key).append(" to remove it");
418: throw new MessagingException(exceptionBuffer.toString());
419: }
420: }
421:
422: /**
423: * List string keys of messages in repository.
424: *
425: * @return an <code>Iterator</code> over the list of keys in the repository
426: *
427: */
428: public Iterator list() {
429: // Fix ConcurrentModificationException by cloning
430: // the keyset before getting an iterator
431: final ArrayList clone;
432: if (keys != null)
433: synchronized (keys) {
434: clone = new ArrayList(keys);
435: }
436: else {
437: clone = new ArrayList();
438: for (Iterator i = or.list(); i.hasNext();) {
439: clone.add(i.next());
440: }
441: }
442: if (fifo)
443: Collections.sort(clone); // Keys is a HashSet; impose FIFO for apps that need it
444: return clone.iterator();
445: }
446: }
|