001: /****************************************************************
002: * Licensed to the Apache Software Foundation (ASF) under one *
003: * or more contributor license agreements. See the NOTICE file *
004: * distributed with this work for additional information *
005: * regarding copyright ownership. The ASF licenses this file *
006: * to you under the Apache License, Version 2.0 (the *
007: * "License"); you may not use this file except in compliance *
008: * with the License. You may obtain a copy of the License at *
009: * *
010: * http://www.apache.org/licenses/LICENSE-2.0 *
011: * *
012: * Unless required by applicable law or agreed to in writing, *
013: * software distributed under the License is distributed on an *
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
015: * KIND, either express or implied. See the License for the *
016: * specific language governing permissions and limitations *
017: * under the License. *
018: ****************************************************************/package org.apache.james.test.mock.james;
019:
020: import org.apache.james.services.SpoolRepository;
021: import org.apache.james.test.mock.avalon.MockLogger;
022: import org.apache.james.util.Lock;
023: import org.apache.mailet.Mail;
024:
025: import javax.mail.MessagingException;
026:
027: import java.util.ArrayList;
028: import java.util.Collection;
029: import java.util.ConcurrentModificationException;
030: import java.util.Hashtable;
031: import java.util.Iterator;
032:
033: /**
034: * Implementation of a MailRepository on a FileSystem.
035: *
036: * Requires a configuration element in the .conf.xml file of the form:
037: * <repository destinationURL="file://path-to-root-dir-for-repository"
038: * type="MAIL"
039: * model="SYNCHRONOUS"/>
040: * Requires a logger called MailRepository.
041: *
042: * @version 1.0.0, 24/04/1999
043: */
044: public class InMemorySpoolRepository implements SpoolRepository {
045:
046: /**
047: * Whether 'deep debugging' is turned on.
048: */
049: protected final static boolean DEEP_DEBUG = true;
050: private Lock lock;
051: private MockLogger logger;
052: private Hashtable spool;
053:
054: private MockLogger getLogger() {
055: if (logger == null) {
056: logger = new MockLogger();
057: }
058: return logger;
059: }
060:
061: /**
062: * Releases a lock on a message identified by a key
063: *
064: * @param key the key of the message to be unlocked
065: *
066: * @return true if successfully released the lock, false otherwise
067: */
068: public boolean unlock(String key) {
069: if (lock.unlock(key)) {
070: if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
071: StringBuffer debugBuffer = new StringBuffer(256)
072: .append("Unlocked ").append(key)
073: .append(" for ").append(
074: Thread.currentThread().getName())
075: .append(" @ ").append(
076: new java.util.Date(System
077: .currentTimeMillis()));
078: getLogger().debug(debugBuffer.toString());
079: }
080: return true;
081: } else {
082: return false;
083: }
084: }
085:
086: /**
087: * Obtains a lock on a message identified by a key
088: *
089: * @param key the key of the message to be locked
090: *
091: * @return true if successfully obtained the lock, false otherwise
092: */
093: public boolean lock(String key) {
094: if (lock.lock(key)) {
095: if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
096: StringBuffer debugBuffer = new StringBuffer(256)
097: .append("Locked ").append(key).append(" for ")
098: .append(Thread.currentThread().getName())
099: .append(" @ ").append(
100: new java.util.Date(System
101: .currentTimeMillis()));
102: getLogger().debug(debugBuffer.toString());
103: }
104: // synchronized (this) {
105: // notifyAll();
106: // }
107: return true;
108: } else {
109: return false;
110: }
111: }
112:
113: /**
114: * Stores a message in this repository. Shouldn't this return the key
115: * under which it is stored?
116: *
117: * @param mc the mail message to store
118: */
119: public void store(Mail mc) throws MessagingException {
120: try {
121: String key = mc.getName();
122: //Remember whether this key was locked
123: boolean wasLocked = true;
124: synchronized (this ) {
125: wasLocked = lock.isLocked(key);
126:
127: if (!wasLocked) {
128: //If it wasn't locked, we want a lock during the store
129: lock(key);
130: }
131: }
132: try {
133: spool.put(key, mc);
134: } finally {
135: if (!wasLocked) {
136: // If it wasn't locked, we need to unlock now
137: unlock(key);
138: synchronized (this ) {
139: notify();
140: }
141: }
142: }
143:
144: if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
145: StringBuffer logBuffer = new StringBuffer(64).append(
146: "Mail ").append(key).append(" stored.");
147: getLogger().debug(logBuffer.toString());
148: }
149:
150: } catch (Exception e) {
151: getLogger().error("Exception storing mail: " + e);
152: throw new MessagingException(
153: "Exception caught while storing Message Container: ",
154: e);
155: }
156: }
157:
158: /**
159: * Retrieves a message given a key. At the moment, keys can be obtained
160: * from list() in superinterface Store.Repository
161: *
162: * @param key the key of the message to retrieve
163: * @return the mail corresponding to this key, null if none exists
164: */
165: public Mail retrieve(String key) throws MessagingException {
166: if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
167: getLogger().debug("Retrieving mail: " + key);
168: }
169: try {
170: Mail mc = null;
171: try {
172: mc = (Mail) spool.get(key);
173: } catch (RuntimeException re) {
174: StringBuffer exceptionBuffer = new StringBuffer(128);
175: if (re.getCause() instanceof Error) {
176: exceptionBuffer
177: .append(
178: "Error when retrieving mail, not deleting: ")
179: .append(re.toString());
180: } else {
181: exceptionBuffer.append(
182: "Exception retrieving mail: ").append(
183: re.toString()).append(
184: ", so we're deleting it.");
185: remove(key);
186: }
187: getLogger().warn(exceptionBuffer.toString());
188: return null;
189: }
190: return mc;
191: } catch (Exception me) {
192: getLogger().error("Exception retrieving mail: " + me);
193: throw new MessagingException(
194: "Exception while retrieving mail: "
195: + me.getMessage());
196: }
197: }
198:
199: /**
200: * Removes a specified message
201: *
202: * @param mail the message to be removed from the repository
203: */
204: public void remove(Mail mail) throws MessagingException {
205: remove(mail.getName());
206: }
207:
208: /**
209: * Removes a Collection of mails from the repository
210: * @param mails The Collection of <code>MailImpl</code>'s to delete
211: * @throws MessagingException
212: * @since 2.2.0
213: */
214: public void remove(Collection mails) throws MessagingException {
215: Iterator delList = mails.iterator();
216: while (delList.hasNext()) {
217: remove((Mail) delList.next());
218: }
219: }
220:
221: /**
222: * Removes a message identified by key.
223: *
224: * @param key the key of the message to be removed from the repository
225: */
226: public void remove(String key) throws MessagingException {
227: if (lock(key)) {
228: try {
229: if (spool != null)
230: spool.remove(key);
231: } finally {
232: unlock(key);
233: }
234: } else {
235: StringBuffer exceptionBuffer = new StringBuffer(64).append(
236: "Cannot lock ").append(key).append(" to remove it");
237: throw new MessagingException(exceptionBuffer.toString());
238: }
239: }
240:
241: /**
242: * List string keys of messages in repository.
243: *
244: * @return an <code>Iterator</code> over the list of keys in the repository
245: *
246: */
247: public Iterator list() {
248: // Fix ConcurrentModificationException by cloning
249: // the keyset before getting an iterator
250: final ArrayList clone;
251: synchronized (spool) {
252: clone = new ArrayList(spool.keySet());
253: }
254: return clone.iterator();
255: }
256:
257: /**
258: * <p>Returns an arbitrarily selected mail deposited in this Repository.
259: * Usage: SpoolManager calls accept() to see if there are any unprocessed
260: * mails in the spool repository.</p>
261: *
262: * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
263: *
264: * @return the mail
265: */
266: public synchronized Mail accept() throws InterruptedException {
267: if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
268: getLogger().debug("Method accept() called");
269: }
270: return accept(new SpoolRepository.AcceptFilter() {
271: public boolean accept(String _, String __, long ___,
272: String ____) {
273: return true;
274: }
275:
276: public long getWaitTime() {
277: return 0;
278: }
279: });
280: }
281:
282: /**
283: * <p>Returns an arbitrarily selected mail deposited in this Repository that
284: * is either ready immediately for delivery, or is younger than it's last_updated plus
285: * the number of failed attempts times the delay time.
286: * Usage: RemoteDeliverySpool calls accept() with some delay and should block until an
287: * unprocessed mail is available.</p>
288: *
289: * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
290: *
291: * @return the mail
292: */
293: public synchronized Mail accept(final long delay)
294: throws InterruptedException {
295: if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
296: getLogger().debug("Method accept(delay) called");
297: }
298: return accept(new SpoolRepository.AcceptFilter() {
299: long youngest = 0;
300:
301: public boolean accept(String key, String state,
302: long lastUpdated, String errorMessage) {
303: if (state.equals(Mail.ERROR)) {
304: //Test the time...
305: long timeToProcess = delay + lastUpdated;
306:
307: if (System.currentTimeMillis() > timeToProcess) {
308: //We're ready to process this again
309: return true;
310: } else {
311: //We're not ready to process this.
312: if (youngest == 0 || youngest > timeToProcess) {
313: //Mark this as the next most likely possible mail to process
314: youngest = timeToProcess;
315: }
316: return false;
317: }
318: } else {
319: //This mail is good to go... return the key
320: return true;
321: }
322: }
323:
324: public long getWaitTime() {
325: if (youngest == 0) {
326: return 0;
327: } else {
328: long duration = youngest
329: - System.currentTimeMillis();
330: youngest = 0; //get ready for next round
331: return duration <= 0 ? 1 : duration;
332: }
333: }
334: });
335: }
336:
337: /**
338: * Returns an arbitrarily select mail deposited in this Repository for
339: * which the supplied filter's accept method returns true.
340: * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
341: * based on number of retries if the mail is ready for processing.
342: * If no message is ready the method will block until one is, the amount of time to block is
343: * determined by calling the filters getWaitTime method.
344: *
345: * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
346: *
347: * @return the mail
348: */
349: public synchronized Mail accept(SpoolRepository.AcceptFilter filter)
350: throws InterruptedException {
351: if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
352: getLogger().debug("Method accept(Filter) called");
353: }
354: while (!Thread.currentThread().isInterrupted())
355: try {
356: for (Iterator it = list(); it.hasNext();) {
357: String s = it.next().toString();
358: if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
359: StringBuffer logBuffer = new StringBuffer(64)
360: .append("Found item ").append(s)
361: .append(" in spool.");
362: getLogger().debug(logBuffer.toString());
363: }
364: if (lock(s)) {
365: if ((DEEP_DEBUG)
366: && (getLogger().isDebugEnabled())) {
367: getLogger().debug(
368: "accept(Filter) has locked: " + s);
369: }
370: try {
371: Mail mail = retrieve(s);
372: // Retrieve can return null if the mail is no longer on the spool
373: // (i.e. another thread has gotten to it first).
374: // In this case we simply continue to the next key
375: if (mail == null
376: || !filter.accept(mail.getName(),
377: mail.getState(), mail
378: .getLastUpdated()
379: .getTime(), mail
380: .getErrorMessage())) {
381: unlock(s);
382: continue;
383: }
384: return mail;
385: } catch (javax.mail.MessagingException e) {
386: unlock(s);
387: getLogger().error(
388: "Exception during retrieve -- skipping item "
389: + s, e);
390: }
391: }
392: }
393:
394: //We did not find any... let's wait for a certain amount of time
395: wait(filter.getWaitTime());
396: } catch (InterruptedException ex) {
397: throw ex;
398: } catch (ConcurrentModificationException cme) {
399: // Should never get here now that list methods clones keyset for iterator
400: getLogger()
401: .error(
402: "CME in spooler - please report to http://james.apache.org",
403: cme);
404: }
405: throw new InterruptedException();
406: }
407:
408: /**
409: *
410: */
411: public InMemorySpoolRepository() {
412: spool = new Hashtable();
413: lock = new Lock();
414: }
415:
416: public int size() {
417: return spool.size();
418: }
419:
420: public void clear() {
421: spool.clear();
422: }
423:
424: }
|