001: /****************************************************************
002: * Licensed to the Apache Software Foundation (ASF) under one *
003: * or more contributor license agreements. See the NOTICE file *
004: * distributed with this work for additional information *
005: * regarding copyright ownership. The ASF licenses this file *
006: * to you under the Apache License, Version 2.0 (the *
007: * "License"); you may not use this file except in compliance *
008: * with the License. You may obtain a copy of the License at *
009: * *
010: * http://www.apache.org/licenses/LICENSE-2.0 *
011: * *
012: * Unless required by applicable law or agreed to in writing, *
013: * software distributed under the License is distributed on an *
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
015: * KIND, either express or implied. See the License for the *
016: * specific language governing permissions and limitations *
017: * under the License. *
018: ****************************************************************/package org.apache.james.mailrepository;
019:
020: import org.apache.james.services.SpoolRepository;
021: import org.apache.mailet.Mail;
022:
023: import java.util.ConcurrentModificationException;
024: import java.util.Iterator;
025:
026: /**
027: * Implementation of a MailRepository on a FileSystem.
028: *
029: * Requires a configuration element in the .conf.xml file of the form:
030: * <repository destinationURL="file://path-to-root-dir-for-repository"
031: * type="MAIL"
032: * model="SYNCHRONOUS"/>
033: * Requires a logger called MailRepository.
034: *
035: * @version 1.0.0, 24/04/1999
036: */
037: public class AvalonSpoolRepository extends AvalonMailRepository
038: implements SpoolRepository {
039:
040: /**
041: * <p>Returns an arbitrarily selected mail deposited in this Repository.
042: * Usage: SpoolManager calls accept() to see if there are any unprocessed
043: * mails in the spool repository.</p>
044: *
045: * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
046: *
047: * @return the mail
048: */
049: public synchronized Mail accept() throws InterruptedException {
050: if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
051: getLogger().debug("Method accept() called");
052: }
053: return accept(new SpoolRepository.AcceptFilter() {
054: public boolean accept(String _, String __, long ___,
055: String ____) {
056: return true;
057: }
058:
059: public long getWaitTime() {
060: return 0;
061: }
062: });
063: }
064:
065: /**
066: * <p>Returns an arbitrarily selected mail deposited in this Repository that
067: * is either ready immediately for delivery, or is younger than it's last_updated plus
068: * the number of failed attempts times the delay time.
069: * Usage: RemoteDeliverySpool calls accept() with some delay and should block until an
070: * unprocessed mail is available.</p>
071: *
072: * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
073: *
074: * @return the mail
075: */
076: public synchronized Mail accept(final long delay)
077: throws InterruptedException {
078: if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
079: getLogger().debug("Method accept(delay) called");
080: }
081: return accept(new SpoolRepository.AcceptFilter() {
082: long youngest = 0;
083:
084: public boolean accept(String key, String state,
085: long lastUpdated, String errorMessage) {
086: if (state.equals(Mail.ERROR)) {
087: //Test the time...
088: long timeToProcess = delay + lastUpdated;
089:
090: if (System.currentTimeMillis() > timeToProcess) {
091: //We're ready to process this again
092: return true;
093: } else {
094: //We're not ready to process this.
095: if (youngest == 0 || youngest > timeToProcess) {
096: //Mark this as the next most likely possible mail to process
097: youngest = timeToProcess;
098: }
099: return false;
100: }
101: } else {
102: //This mail is good to go... return the key
103: return true;
104: }
105: }
106:
107: public long getWaitTime() {
108: if (youngest == 0) {
109: return 0;
110: } else {
111: long duration = youngest
112: - System.currentTimeMillis();
113: youngest = 0; //get ready for next round
114: return duration <= 0 ? 1 : duration;
115: }
116: }
117: });
118: }
119:
120: /**
121: * Returns an arbitrarily select mail deposited in this Repository for
122: * which the supplied filter's accept method returns true.
123: * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
124: * based on number of retries if the mail is ready for processing.
125: * If no message is ready the method will block until one is, the amount of time to block is
126: * determined by calling the filters getWaitTime method.
127: *
128: * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
129: *
130: * @return the mail
131: */
132: public synchronized Mail accept(SpoolRepository.AcceptFilter filter)
133: throws InterruptedException {
134: if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
135: getLogger().debug("Method accept(Filter) called");
136: }
137: while (!Thread.currentThread().isInterrupted())
138: try {
139: for (Iterator it = list(); it.hasNext();) {
140: String s = it.next().toString();
141: if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
142: StringBuffer logBuffer = new StringBuffer(64)
143: .append("Found item ").append(s)
144: .append(" in spool.");
145: getLogger().debug(logBuffer.toString());
146: }
147: if (lock(s)) {
148: if ((DEEP_DEBUG)
149: && (getLogger().isDebugEnabled())) {
150: getLogger().debug(
151: "accept(Filter) has locked: " + s);
152: }
153: try {
154: Mail mail = retrieve(s);
155: // Retrieve can return null if the mail is no longer on the spool
156: // (i.e. another thread has gotten to it first).
157: // In this case we simply continue to the next key
158: if (mail == null
159: || !filter.accept(mail.getName(),
160: mail.getState(), mail
161: .getLastUpdated()
162: .getTime(), mail
163: .getErrorMessage())) {
164: unlock(s);
165: continue;
166: }
167: return mail;
168: } catch (javax.mail.MessagingException e) {
169: unlock(s);
170: getLogger().error(
171: "Exception during retrieve -- skipping item "
172: + s, e);
173: }
174: }
175: }
176:
177: //We did not find any... let's wait for a certain amount of time
178: wait(filter.getWaitTime());
179: } catch (InterruptedException ex) {
180: throw ex;
181: } catch (ConcurrentModificationException cme) {
182: // Should never get here now that list methods clones keyset for iterator
183: getLogger()
184: .error(
185: "CME in spooler - please report to http://james.apache.org",
186: cme);
187: }
188: throw new InterruptedException();
189: }
190:
191: }
|