001: /****************************************************************
002: * Licensed to the Apache Software Foundation (ASF) under one *
003: * or more contributor license agreements. See the NOTICE file *
004: * distributed with this work for additional information *
005: * regarding copyright ownership. The ASF licenses this file *
006: * to you under the Apache License, Version 2.0 (the *
007: * "License"); you may not use this file except in compliance *
008: * with the License. You may obtain a copy of the License at *
009: * *
010: * http://www.apache.org/licenses/LICENSE-2.0 *
011: * *
012: * Unless required by applicable law or agreed to in writing, *
013: * software distributed under the License is distributed on an *
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
015: * KIND, either express or implied. See the License for the *
016: * specific language governing permissions and limitations *
017: * under the License. *
018: ****************************************************************/package org.apache.james.transport;
019:
020: import org.apache.avalon.framework.activity.Initializable;
021: import org.apache.avalon.framework.activity.Disposable;
022: import org.apache.avalon.framework.container.ContainerUtil;
023: import org.apache.avalon.framework.logger.AbstractLogEnabled;
024: import org.apache.james.core.MailImpl;
025: import org.apache.james.core.MailetConfigImpl;
026: import org.apache.james.services.SpoolRepository;
027: import org.apache.mailet.GenericMailet;
028: import org.apache.mailet.GenericMatcher;
029: import org.apache.mailet.Mail;
030: import org.apache.mailet.MailAddress;
031: import org.apache.mailet.Mailet;
032: import org.apache.mailet.MailetConfig;
033: import org.apache.mailet.MailetException;
034: import org.apache.mailet.Matcher;
035:
036: import javax.mail.MessagingException;
037: import java.io.PrintWriter;
038: import java.io.StringWriter;
039: import java.util.ArrayList;
040: import java.util.Collection;
041: import java.util.LinkedList;
042: import java.util.List;
043: import java.util.Random;
044: import java.util.Iterator;
045: import java.util.Locale;
046:
047: /**
048: * Implements a processor for mails, directing the mail down
049: * the chain of matchers/mailets.
050: *
051: * SAMPLE CONFIGURATION
052: * <processor name="try" onerror="return,log">
053: * <mailet match="RecipientIsLocal" class="LocalDelivery">
054: * </mailet>
055: * <mailet match="All" class="RemoteDelivery">
056: * <delayTime>21600000</delayTime>
057: * <maxRetries>5</maxRetries>
058: * </mailet>
059: * </processor>
060: *
061: * Note that the 'onerror' attribute is not yet supported.
062: *
063: * As of James v2.2.0a5, 'onerror' functionality is implemented, but
064: * it is implemented on the <mailet> tag. The specification is:
065: *
066: * <mailet match="..." class="..."
067: * [onMatchException="{noMatch|matchAll|error|<aProcessorName>}"]
068: * [onMailetException="{ignore|error|<aProcessorName>}"]>
069: *
070: * noMatch: no addresses are considered to match
071: * matchAll: all addresses are considered to match
072: * error: as before, send the message to the ERROR processor
073: *
074: * Otherwise, a processor name can be specified, and the message will
075: * be sent there.
076: *
077: * <P>CVS $Id: LinearProcessor.java 494012 2007-01-08 10:23:58Z norman $</P>
078: * @version 2.2.0
079: */
080: public class LinearProcessor extends AbstractLogEnabled implements
081: Initializable, Disposable {
082:
083: private static final Random random = new Random(); // Used to generate new mail names
084:
085: /**
086: * The name of the matcher used to terminate the matcher chain. The
087: * end of the matcher/mailet chain must be a matcher that matches
088: * all mails and a mailet that sets every mail to GHOST status.
089: * This is necessary to ensure that mails are removed from the spool
090: * in an orderly fashion.
091: */
092: private static final String TERMINATING_MATCHER_NAME = "Terminating%Matcher%Name";
093:
094: /**
095: * The name of the mailet used to terminate the mailet chain. The
096: * end of the matcher/mailet chain must be a matcher that matches
097: * all mails and a mailet that sets every mail to GHOST status.
098: * This is necessary to ensure that mails are removed from the spool
099: * in an orderly fashion.
100: */
101: private static final String TERMINATING_MAILET_NAME = "Terminating%Mailet%Name";
102:
103: private List mailets; // The list of mailets for this processor
104: private List matchers; // The list of matchers for this processor
105: private volatile boolean listsClosed; // Whether the matcher/mailet lists have been closed.
106: private SpoolRepository spool; // The spool on which this processor is acting
107:
108: /**
109: * Set the spool to be used by this LinearProcessor.
110: *
111: * @param spool the spool to be used by this processor
112: *
113: * @throws IllegalArgumentException when the spool passed in is null
114: */
115: public void setSpool(SpoolRepository spool) {
116: if (spool == null) {
117: throw new IllegalArgumentException(
118: "The spool cannot be null");
119: }
120: this .spool = spool;
121: }
122:
123: /**
124: * @see org.apache.avalon.framework.activity.Initializable#initialize()
125: */
126: public void initialize() {
127: matchers = new ArrayList();
128: mailets = new ArrayList();
129: }
130:
131: /**
132: * <p>The dispose operation is called at the end of a components lifecycle.
133: * Instances of this class use this method to release and destroy any
134: * resources that they own.</p>
135: *
136: * <p>This implementation disposes of all the mailet instances added to the
137: * processor</p>
138: *
139: * @throws Exception if an error is encountered during shutdown
140: */
141: public void dispose() {
142: Iterator it = mailets.iterator();
143: boolean debugEnabled = getLogger().isDebugEnabled();
144: while (it.hasNext()) {
145: Mailet mailet = (Mailet) it.next();
146: if (debugEnabled) {
147: getLogger().debug(
148: "Shutdown mailet " + mailet.getMailetInfo());
149: }
150: mailet.destroy();
151: }
152: }
153:
154: /**
155: * <p>Adds a new <code>Matcher</code> / <code>Mailet</code> pair
156: * to the processor. Checks to ensure that the matcher and
157: * mailet passed in are not null. Synchronized to ensure that
158: * the matchers and mailets are kept in sync.</p>
159: *
160: * <p>It is an essential part of the contract of the LinearProcessor
161: * that a particular matcher/mailet combination be used to
162: * terminate the processor chain. This is done by calling the
163: * closeProcessorList method.</p>
164: *
165: * <p>Once the closeProcessorList has been called any subsequent
166: * call to the add method will result in an IllegalStateException.</p>
167: *
168: * <p>This method is synchronized to protect against corruption of
169: * matcher/mailets lists</p>
170: *
171: * @param matcher the new matcher being added
172: * @param mailet the new mailet being added
173: *
174: * @throws IllegalArgumentException when the matcher or mailet passed in is null
175: * @throws IllegalStateException when this method is called after the processor lists have been closed
176: */
177: public synchronized void add(Matcher matcher, Mailet mailet) {
178: if (matcher == null) {
179: throw new IllegalArgumentException(
180: "Null valued matcher passed to LinearProcessor.");
181: }
182: if (mailet == null) {
183: throw new IllegalArgumentException(
184: "Null valued mailet passed to LinearProcessor.");
185: }
186: if (listsClosed) {
187: throw new IllegalStateException(
188: "Attempt to add matcher/mailet after lists have been closed");
189: }
190: matchers.add(matcher);
191: mailets.add(mailet);
192: }
193:
194: /**
195: * <p>Closes the processor matcher/mailet list.</p>
196: *
197: * <p>This method is synchronized to protect against corruption of
198: * matcher/mailets lists</p>
199: *
200: * @throws IllegalStateException when this method is called after the processor lists have been closed
201: */
202: public synchronized void closeProcessorLists() {
203: if (listsClosed) {
204: throw new IllegalStateException(
205: "Processor's matcher/mailet lists have already been closed.");
206: }
207: Matcher terminatingMatcher = new GenericMatcher() {
208: public Collection match(Mail mail) {
209: return mail.getRecipients();
210: }
211:
212: public String getMatcherInfo() {
213: return TERMINATING_MATCHER_NAME;
214: }
215: };
216: Mailet terminatingMailet = new GenericMailet() {
217: public void service(Mail mail) {
218: if (!(Mail.ERROR.equals(mail.getState()))) {
219: // Don't complain if we fall off the end of the
220: // error processor. That is currently the
221: // normal situation for James, and the message
222: // will show up in the error store.
223: StringBuffer warnBuffer = new StringBuffer(256)
224: .append("Message ")
225: .append(mail.getName())
226: .append(
227: " reached the end of this processor, and is automatically deleted. This may indicate a configuration error.");
228: LinearProcessor.this .getLogger().warn(
229: warnBuffer.toString());
230: }
231: mail.setState(Mail.GHOST);
232: }
233:
234: public String getMailetInfo() {
235: return getMailetName();
236: }
237:
238: public String getMailetName() {
239: return TERMINATING_MAILET_NAME;
240: }
241: };
242: add(terminatingMatcher, terminatingMailet);
243: listsClosed = true;
244: }
245:
246: /**
247: * <p>Processes a single mail message through the chain of matchers and mailets.</p>
248: *
249: * <p>Calls to this method before setSpool has been called with a non-null argument
250: * will result in an <code>IllegalStateException</code>.</p>
251: *
252: * <p>If the matcher/mailet lists have not been closed by a call to the closeProcessorLists
253: * method then a call to this method will result in an <code>IllegalStateException</code>.
254: * The end of the matcher/mailet chain must be a matcher that matches all mails and
255: * a mailet that sets every mail to GHOST status. This is necessary to ensure that
256: * mails are removed from the spool in an orderly fashion. The closeProcessorLists method
257: * ensures this.</p>
258: *
259: * @param mail the new mail to be processed
260: *
261: * @throws IllegalStateException when this method is called before the processor lists have been closed
262: * or the spool has been initialized
263: */
264: public void service(Mail mail) throws MessagingException {
265: if (spool == null) {
266: throw new IllegalStateException(
267: "Attempt to service mail before the spool has been set to a non-null value");
268: }
269:
270: if (!listsClosed) {
271: throw new IllegalStateException(
272: "Attempt to service mail before matcher/mailet lists have been closed");
273: }
274:
275: if (getLogger().isDebugEnabled()) {
276: getLogger().debug("Servicing mail: " + mail.getName());
277: }
278: // unprocessed is an array of Lists of Mail objects
279: // the array indicates which matcher/mailet (stage in the linear
280: // processor) that this Mail needs to be processed.
281: // e.g., a Mail in unprocessed[0] needs to be
282: // processed by the first matcher/mailet.
283: //
284: // It is a List of Mail objects at each array spot as multiple Mail
285: // objects could be at the same stage.
286: //
287: // Note that every Mail object in this array will either be the
288: // original Mail object passed in, or a result of this method's
289: // (and hence this thread's) processing.
290:
291: List[] unprocessed = new List[matchers.size() + 1];
292:
293: for (int i = 0; i < unprocessed.length; i++) {
294: // No need to use synchronization, as this is totally
295: // local to the method
296: unprocessed[i] = new LinkedList();
297: }
298:
299: //Add the object to the bottom of the list
300: unprocessed[0].add(mail);
301:
302: //This is the original state of the message
303: String originalState = mail.getState();
304:
305: // The original mail: we should not care to save this mail.
306: // This should be saved in the spoolmanager.
307: Mail originalMail = mail;
308:
309: //We'll use these as temporary variables in the loop
310: mail = null; // the message we're currently processing
311: int i = 0; // where in the stage we're looking
312: while (true) {
313: // The last element in the unprocessed array has mail messages
314: // that have completed all stages. We want them to just die,
315: // so we clear that spot to allow garbage collection of the
316: // objects.
317: //
318: // Please note that the presence of the terminating mailet at the end
319: // of the chain is critical to the proper operation
320: // of the LinearProcessor code. If this mailet is not placed
321: // at the end of the chain with a terminating matcher, there is a
322: // potential for configuration or implementation errors to
323: // lead to mails trapped in the spool. This matcher/mailet
324: // combination is added when the closeProcessorList method
325: // is called.
326: unprocessed[unprocessed.length - 1].clear();
327:
328: //initialize the mail reference we will be searching on
329: mail = null;
330:
331: //Scan through all stages, trying to find a message to process
332: for (i = 0; i < unprocessed.length; i++) {
333: if (unprocessed[i].size() > 0) {
334: //Get the first element from the queue, and remove it from there
335: mail = (Mail) unprocessed[i].remove(0);
336: break;
337: }
338: }
339:
340: //Check it we found anything
341: if (mail == null) {
342: //We found no messages to process... we're done servicing the mail object
343: return;
344: }
345:
346: //Call the matcher and find what recipients match
347: Collection recipients = null;
348: Matcher matcher = (Matcher) matchers.get(i);
349: StringBuffer logMessageBuffer = null;
350: if (getLogger().isDebugEnabled()) {
351: logMessageBuffer = new StringBuffer(128).append(
352: "Checking ").append(mail.getName()).append(
353: " with ").append(matcher);
354: getLogger().debug(logMessageBuffer.toString());
355: }
356: try {
357: recipients = matcher.match(mail);
358: if (recipients == null) {
359: //In case the matcher returned null, create an empty Collection
360: recipients = new ArrayList(0);
361: } else if (recipients != mail.getRecipients()) {
362: //Make sure all the objects are MailAddress objects
363: verifyMailAddresses(recipients);
364: }
365: } catch (MessagingException me) {
366: // look in the matcher's mailet's init attributes
367: MailetConfig mailetConfig = ((Mailet) mailets.get(i))
368: .getMailetConfig();
369: String onMatchException = ((MailetConfigImpl) mailetConfig)
370: .getInitAttribute("onMatchException");
371: if (onMatchException == null) {
372: onMatchException = Mail.ERROR;
373: } else {
374: onMatchException = onMatchException.trim()
375: .toLowerCase(Locale.US);
376: }
377: if (onMatchException.compareTo("nomatch") == 0) {
378: //In case the matcher returned null, create an empty Collection
379: recipients = new ArrayList(0);
380: } else if (onMatchException.compareTo("matchall") == 0) {
381: recipients = mail.getRecipients();
382: // no need to verify addresses
383: } else {
384: handleException(me, mail, matcher
385: .getMatcherConfig().getMatcherName(),
386: onMatchException);
387: }
388: }
389:
390: // Split the recipients into two pools. notRecipients will contain the
391: // recipients on the message that the matcher did not return.
392: Collection notRecipients;
393: if (recipients == mail.getRecipients()
394: || recipients.size() == 0) {
395: notRecipients = new ArrayList(0);
396: } else {
397: notRecipients = new ArrayList(mail.getRecipients());
398: notRecipients.removeAll(recipients);
399: }
400:
401: if (recipients.size() == 0) {
402: //Everything was not a match... store it in the next spot in the array
403: unprocessed[i + 1].add(mail);
404: continue;
405: }
406: if (notRecipients.size() != 0) {
407: // There are a mix of recipients and not recipients.
408: // We need to clone this message, put the notRecipients on the clone
409: // and store it in the next spot
410: Mail notMail = new MailImpl(mail, newName(mail));
411: notMail.setRecipients(notRecipients);
412: // set the state to the current processor
413: notMail.setState(originalState);
414: unprocessed[i + 1].add(notMail);
415: //We have to set the reduce possible recipients on the old message
416: mail.setRecipients(recipients);
417: }
418: // We have messages that need to process... time to run the mailet.
419: Mailet mailet = (Mailet) mailets.get(i);
420: if (getLogger().isDebugEnabled()) {
421: logMessageBuffer = new StringBuffer(128).append(
422: "Servicing ").append(mail.getName()).append(
423: " by ").append(mailet.getMailetInfo());
424: getLogger().debug(logMessageBuffer.toString());
425: }
426: try {
427: mailet.service(mail);
428: // Make sure all the recipients are still MailAddress objects
429: verifyMailAddresses(mail.getRecipients());
430: } catch (MessagingException me) {
431: MailetConfig mailetConfig = mailet.getMailetConfig();
432: String onMailetException = ((MailetConfigImpl) mailetConfig)
433: .getInitAttribute("onMailetException");
434: if (onMailetException == null) {
435: onMailetException = Mail.ERROR;
436: } else {
437: onMailetException = onMailetException.trim()
438: .toLowerCase(Locale.US);
439: }
440: if (onMailetException.compareTo("ignore") == 0) {
441: // ignore the exception and continue
442: // this option should not be used if the mail object can be changed by the mailet
443: verifyMailAddresses(mail.getRecipients());
444: } else {
445: handleException(me, mail, mailet.getMailetConfig()
446: .getMailetName(), onMailetException);
447: }
448: }
449:
450: // See if the state was changed by the mailet
451: if (!mail.getState().equals(originalState)) {
452: //If this message was ghosted, we just want to let it die
453: if (mail.getState().equals(Mail.GHOST)) {
454: // let this instance die...
455: ContainerUtil.dispose(mail);
456: mail = null;
457: continue;
458: }
459: // This was just set to another state requiring further processing...
460: // Store this back in the spool and it will get picked up and
461: // run in that processor
462: // We store only mails created by the matcher "splitting"
463: // The original mail will be "stored" by the caller.
464: if (originalMail != mail) {
465: spool.store(mail);
466: ContainerUtil.dispose(mail);
467: }
468: mail = null;
469: continue;
470: } else {
471: // Ok, we made it through with the same state... move it to the next
472: // spot in the array
473: unprocessed[i + 1].add(mail);
474: }
475:
476: }
477: }
478:
479: /**
480: * Create a unique new primary key name.
481: *
482: * @param mail the mail to use as the basis for the new mail name
483: *
484: * @return a new name
485: */
486: private String newName(Mail mail) {
487: StringBuffer nameBuffer = new StringBuffer(64).append(
488: mail.getName()).append("-!").append(
489: random.nextInt(1048576));
490: return nameBuffer.toString();
491: }
492:
493: /**
494: * Checks that all objects in this class are of the form MailAddress.
495: *
496: * @throws MessagingException when the <code>Collection</code> contains objects that are not <code>MailAddress</code> objects
497: */
498: private void verifyMailAddresses(Collection col)
499: throws MessagingException {
500: try {
501: MailAddress addresses[] = (MailAddress[]) col
502: .toArray(new MailAddress[0]);
503:
504: // Why is this here? According to the javadoc for
505: // java.util.Collection.toArray(Object[]), this should
506: // never happen. The exception will be thrown.
507: if (addresses.length != col.size()) {
508: throw new MailetException(
509: "The recipient list contains objects other than MailAddress objects");
510: }
511: } catch (ArrayStoreException ase) {
512: throw new MailetException(
513: "The recipient list contains objects other than MailAddress objects");
514: }
515: }
516:
517: /**
518: * This is a helper method that updates the state of the mail object to
519: * Mail.ERROR as well as recording the exception to the log
520: *
521: * @param me the exception to be handled
522: * @param mail the mail being processed when the exception was generated
523: * @param offendersName the matcher or mailet than generated the exception
524: * @param nextState the next state to set
525: *
526: * @throws MessagingException thrown always, rethrowing the passed in exception
527: */
528: private void handleException(MessagingException me, Mail mail,
529: String offendersName, String nextState)
530: throws MessagingException {
531: System.err.println("exception! " + me);
532: mail.setState(nextState);
533: StringWriter sout = new StringWriter();
534: PrintWriter out = new PrintWriter(sout, true);
535: StringBuffer exceptionBuffer = new StringBuffer(128).append(
536: "Exception calling ").append(offendersName)
537: .append(": ").append(me.getMessage());
538: out.println(exceptionBuffer.toString());
539: Exception e = me;
540: while (e != null) {
541: e.printStackTrace(out);
542: if (e instanceof MessagingException) {
543: e = ((MessagingException) e).getNextException();
544: } else {
545: e = null;
546: }
547: }
548: String errorString = sout.toString();
549: mail.setErrorMessage(errorString);
550: getLogger().error(errorString);
551: throw me;
552: }
553: }
|