001: /****************************************************************
002: * Licensed to the Apache Software Foundation (ASF) under one *
003: * or more contributor license agreements. See the NOTICE file *
004: * distributed with this work for additional information *
005: * regarding copyright ownership. The ASF licenses this file *
006: * to you under the Apache License, Version 2.0 (the *
007: * "License"); you may not use this file except in compliance *
008: * with the License. You may obtain a copy of the License at *
009: * *
010: * http://www.apache.org/licenses/LICENSE-2.0 *
011: * *
012: * Unless required by applicable law or agreed to in writing, *
013: * software distributed under the License is distributed on an *
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
015: * KIND, either express or implied. See the License for the *
016: * specific language governing permissions and limitations *
017: * under the License. *
018: ****************************************************************/package org.apache.james.transport;
019:
020: import org.apache.avalon.framework.activity.Disposable;
021: import org.apache.avalon.framework.activity.Initializable;
022: import org.apache.avalon.framework.configuration.Configurable;
023: import org.apache.avalon.framework.configuration.Configuration;
024: import org.apache.avalon.framework.configuration.ConfigurationException;
025: import org.apache.avalon.framework.container.ContainerUtil;
026: import org.apache.avalon.framework.logger.AbstractLogEnabled;
027: import org.apache.avalon.framework.service.DefaultServiceManager;
028: import org.apache.avalon.framework.service.ServiceException;
029: import org.apache.avalon.framework.service.ServiceManager;
030: import org.apache.avalon.framework.service.Serviceable;
031: import org.apache.james.services.MailetLoader;
032: import org.apache.james.services.MatcherLoader;
033: import org.apache.james.services.SpoolRepository;
034: import org.apache.mailet.Mail;
035: import org.apache.mailet.Mailet;
036: import org.apache.mailet.MailetException;
037: import org.apache.mailet.Matcher;
038:
039: import javax.mail.MessagingException;
040:
041: import java.util.Collection;
042: import java.util.HashMap;
043: import java.util.Iterator;
044:
045: /**
046: * Manages the mail spool. This class is responsible for retrieving
047: * messages from the spool, directing messages to the appropriate
048: * processor, and removing them from the spool when processing is
049: * complete.
050: *
051: * @version CVS $Revision: 494012 $ $Date: 2007-01-08 11:23:58 +0100 (Mo, 08 Jan 2007) $
052: */
053: public class JamesSpoolManager extends AbstractLogEnabled implements
054: Serviceable, Configurable, Initializable, Runnable, Disposable {
055:
056: /**
057: * System component manager
058: */
059: private DefaultServiceManager compMgr;
060:
061: /**
062: * The configuration object used by this spool manager.
063: */
064: private Configuration conf;
065:
066: /**
067: * The spool that this manager will process
068: */
069: private SpoolRepository spool;
070:
071: /**
072: * The map of processor names to processors
073: */
074: private HashMap processors;
075:
076: /**
077: * The number of threads used to move mail through the spool.
078: */
079: private int numThreads;
080:
081: /**
082: * The ThreadPool containing worker threads.
083: *
084: * This used to be used, but for threads that lived the entire
085: * lifespan of the application. Currently commented out. In
086: * the future, we could use a thread pool to run short-lived
087: * workers, so that we have a smaller number of readers that
088: * accept a message from the spool, and dispatch to a pool of
089: * worker threads that process the message.
090: */
091: // private ThreadPool workerPool;
092: /**
093: * The ThreadManager from which the thread pool is obtained.
094: */
095: // private ThreadManager threadManager;
096: /**
097: * Number of active threads
098: */
099: private int numActive;
100:
101: /**
102: * Spool threads are active
103: */
104: private boolean active;
105:
106: /**
107: * Spool threads
108: */
109: private Collection spoolThreads;
110:
111: /**
112: * @see org.apache.avalon.framework.service.Serviceable#service(ServiceManager)
113: */
114: public void service(ServiceManager comp) throws ServiceException {
115: // threadManager = (ThreadManager) comp.lookup(ThreadManager.ROLE);
116: compMgr = new DefaultServiceManager(comp);
117: }
118:
119: /**
120: * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
121: */
122: public void configure(Configuration conf)
123: throws ConfigurationException {
124: this .conf = conf;
125: numThreads = conf.getChild("threads").getValueAsInteger(1);
126: }
127:
128: /**
129: * @see org.apache.avalon.framework.activity.Initializable#initialize()
130: */
131: public void initialize() throws Exception {
132:
133: getLogger().info("JamesSpoolManager init...");
134: spool = (SpoolRepository) compMgr.lookup(SpoolRepository.ROLE);
135:
136: MailetLoader mailetLoader = (MailetLoader) compMgr
137: .lookup(MailetLoader.ROLE);
138: MatcherLoader matchLoader = (MatcherLoader) compMgr
139: .lookup(MatcherLoader.ROLE);
140:
141: //A processor is a Collection of
142: processors = new HashMap();
143:
144: final Configuration[] processorConfs = conf
145: .getChildren("processor");
146: for (int i = 0; i < processorConfs.length; i++) {
147: Configuration processorConf = processorConfs[i];
148: String processorName = processorConf.getAttribute("name");
149: try {
150: LinearProcessor processor = new LinearProcessor();
151: setupLogger(processor, processorName);
152: processor.setSpool(spool);
153: processor.initialize();
154: processors.put(processorName, processor);
155:
156: final Configuration[] mailetConfs = processorConf
157: .getChildren("mailet");
158: // Loop through the mailet configuration, load
159: // all of the matcher and mailets, and add
160: // them to the processor.
161: for (int j = 0; j < mailetConfs.length; j++) {
162: Configuration c = mailetConfs[j];
163: String mailetClassName = c.getAttribute("class");
164: String matcherName = c.getAttribute("match");
165: Mailet mailet = null;
166: Matcher matcher = null;
167: try {
168: matcher = matchLoader.getMatcher(matcherName);
169: //The matcher itself should log that it's been inited.
170: if (getLogger().isInfoEnabled()) {
171: StringBuffer infoBuffer = new StringBuffer(
172: 64).append("Matcher ").append(
173: matcherName).append(
174: " instantiated.");
175: getLogger().info(infoBuffer.toString());
176: }
177: } catch (MessagingException ex) {
178: // **** Do better job printing out exception
179: if (getLogger().isErrorEnabled()) {
180: StringBuffer errorBuffer = new StringBuffer(
181: 256).append(
182: "Unable to init matcher ").append(
183: matcherName).append(": ").append(
184: ex.toString());
185: getLogger().error(errorBuffer.toString(),
186: ex);
187: if (ex.getNextException() != null) {
188: getLogger().error(
189: "Caused by nested exception: ",
190: ex.getNextException());
191: }
192: }
193: System.err.println("Unable to init matcher "
194: + matcherName);
195: System.err
196: .println("Check spool manager logs for more details.");
197: //System.exit(1);
198: throw ex;
199: }
200: try {
201: mailet = mailetLoader.getMailet(
202: mailetClassName, c);
203: if (getLogger().isInfoEnabled()) {
204: StringBuffer infoBuffer = new StringBuffer(
205: 64).append("Mailet ").append(
206: mailetClassName).append(
207: " instantiated.");
208: getLogger().info(infoBuffer.toString());
209: }
210: } catch (MessagingException ex) {
211: // **** Do better job printing out exception
212: if (getLogger().isErrorEnabled()) {
213: StringBuffer errorBuffer = new StringBuffer(
214: 256).append(
215: "Unable to init mailet ").append(
216: mailetClassName).append(": ")
217: .append(ex.toString());
218: getLogger().error(errorBuffer.toString(),
219: ex);
220: if (ex.getNextException() != null) {
221: getLogger().error(
222: "Caused by nested exception: ",
223: ex.getNextException());
224: }
225: }
226: System.err.println("Unable to init mailet "
227: + mailetClassName);
228: System.err
229: .println("Check spool manager logs for more details.");
230: //System.exit(1);
231: throw ex;
232: }
233: //Add this pair to the processor
234: processor.add(matcher, mailet);
235: }
236:
237: // Close the processor matcher/mailet lists.
238: //
239: // Please note that this is critical to the proper operation
240: // of the LinearProcessor code. The processor will not be
241: // able to service mails until this call is made.
242: processor.closeProcessorLists();
243:
244: if (getLogger().isInfoEnabled()) {
245: StringBuffer infoBuffer = new StringBuffer(64)
246: .append("Processor ").append(processorName)
247: .append(" instantiated.");
248: getLogger().info(infoBuffer.toString());
249: }
250: } catch (Exception ex) {
251: if (getLogger().isErrorEnabled()) {
252: StringBuffer errorBuffer = new StringBuffer(256)
253: .append("Unable to init processor ")
254: .append(processorName).append(": ").append(
255: ex.toString());
256: getLogger().error(errorBuffer.toString(), ex);
257: }
258: throw ex;
259: }
260: }
261: if (getLogger().isInfoEnabled()) {
262: StringBuffer infoBuffer = new StringBuffer(64).append(
263: "Spooler Manager uses ").append(numThreads).append(
264: " Thread(s)");
265: getLogger().info(infoBuffer.toString());
266: }
267:
268: active = true;
269: numActive = 0;
270: spoolThreads = new java.util.ArrayList(numThreads);
271: for (int i = 0; i < numThreads; i++) {
272: Thread reader = new Thread(this , "Spool Thread #" + i);
273: spoolThreads.add(reader);
274: reader.start();
275: }
276: }
277:
278: /**
279: * This routinely checks the message spool for messages, and processes
280: * them as necessary
281: */
282: public void run() {
283:
284: if (getLogger().isInfoEnabled()) {
285: getLogger().info(
286: "Run JamesSpoolManager: "
287: + Thread.currentThread().getName());
288: getLogger().info("Spool=" + spool.getClass().getName());
289: }
290:
291: numActive++;
292: while (active) {
293: String key = null;
294: try {
295: Mail mail = (Mail) spool.accept();
296: key = mail.getName();
297: if (getLogger().isDebugEnabled()) {
298: StringBuffer debugBuffer = new StringBuffer(64)
299: .append("==== Begin processing mail ")
300: .append(mail.getName()).append("====");
301: getLogger().debug(debugBuffer.toString());
302: }
303: process(mail);
304: // Only remove an email from the spool is processing is
305: // complete, or if it has no recipients
306: if ((Mail.GHOST.equals(mail.getState()))
307: || (mail.getRecipients() == null)
308: || (mail.getRecipients().size() == 0)) {
309: ContainerUtil.dispose(mail);
310: spool.remove(key);
311: if (getLogger().isDebugEnabled()) {
312: StringBuffer debugBuffer = new StringBuffer(64)
313: .append("==== Removed from spool mail ")
314: .append(key).append("====");
315: getLogger().debug(debugBuffer.toString());
316: }
317: } else {
318: // spool.remove() has a side-effect! It unlocks the
319: // message so that other threads can work on it! If
320: // we don't remove it, we must unlock it!
321: spool.store(mail);
322: ContainerUtil.dispose(mail);
323: spool.unlock(key);
324: // Do not notify: we simply updated the current mail
325: // and we are able to reprocess it now.
326: }
327: mail = null;
328: } catch (InterruptedException ie) {
329: getLogger().info(
330: "Interrupted JamesSpoolManager: "
331: + Thread.currentThread().getName());
332: } catch (Throwable e) {
333: if (getLogger().isErrorEnabled()) {
334: getLogger().error(
335: "Exception processing " + key
336: + " in JamesSpoolManager.run "
337: + e.getMessage(), e);
338: }
339: /* Move the mail to ERROR state? If we do, it could be
340: * deleted if an error occurs in the ERROR processor.
341: * Perhaps the answer is to resolve that issue by
342: * having a special state for messages that are not to
343: * be processed, but aren't to be deleted? The message
344: * would already be in the spool, but would not be
345: * touched again.
346: if (mail != null) {
347: try {
348: mail.setState(Mail.ERROR);
349: spool.store(mail);
350: }
351: }
352: */
353: }
354: }
355: if (getLogger().isInfoEnabled()) {
356: getLogger().info(
357: "Stop JamesSpoolManager: "
358: + Thread.currentThread().getName());
359: }
360: numActive--;
361: }
362:
363: /**
364: * Process this mail message by the appropriate processor as designated
365: * in the state of the Mail object.
366: *
367: * @param mail the mail message to be processed
368: */
369: protected void process(Mail mail) {
370: while (true) {
371: String processorName = mail.getState();
372: if (processorName.equals(Mail.GHOST)) {
373: //This message should disappear
374: return;
375: }
376: try {
377: LinearProcessor processor = (LinearProcessor) processors
378: .get(processorName);
379: if (processor == null) {
380: StringBuffer exceptionMessageBuffer = new StringBuffer(
381: 128).append("Unable to find processor ")
382: .append(processorName).append(
383: " requested for processing of ")
384: .append(mail.getName());
385: String exceptionMessage = exceptionMessageBuffer
386: .toString();
387: getLogger().debug(exceptionMessage);
388: mail.setState(Mail.ERROR);
389: throw new MailetException(exceptionMessage);
390: }
391: StringBuffer logMessageBuffer = null;
392: if (getLogger().isDebugEnabled()) {
393: logMessageBuffer = new StringBuffer(64).append(
394: "Processing ").append(mail.getName())
395: .append(" through ").append(processorName);
396: getLogger().debug(logMessageBuffer.toString());
397: }
398: processor.service(mail);
399: if (getLogger().isDebugEnabled()) {
400: logMessageBuffer = new StringBuffer(128).append(
401: "Processed ").append(mail.getName())
402: .append(" through ").append(processorName);
403: getLogger().debug(logMessageBuffer.toString());
404: getLogger().debug("Result was " + mail.getState());
405: }
406: return;
407: } catch (Throwable e) {
408: // This is a strange error situation that shouldn't ordinarily
409: // happen
410: StringBuffer exceptionBuffer = new StringBuffer(64)
411: .append("Exception in processor <").append(
412: processorName).append(">");
413: getLogger().error(exceptionBuffer.toString(), e);
414: if (processorName.equals(Mail.ERROR)) {
415: // We got an error on the error processor...
416: // kill the message
417: mail.setState(Mail.GHOST);
418: mail.setErrorMessage(e.getMessage());
419: } else {
420: //We got an error... send it to the requested processor
421: if (!(e instanceof MessagingException)) {
422: //We got an error... send it to the error processor
423: mail.setState(Mail.ERROR);
424: }
425: mail.setErrorMessage(e.getMessage());
426: }
427: }
428: if (getLogger().isErrorEnabled()) {
429: StringBuffer logMessageBuffer = new StringBuffer(128)
430: .append("An error occurred processing ")
431: .append(mail.getName()).append(" through ")
432: .append(processorName);
433: getLogger().error(logMessageBuffer.toString());
434: getLogger().error("Result was " + mail.getState());
435: }
436: }
437: }
438:
439: /**
440: * The dispose operation is called at the end of a components lifecycle.
441: * Instances of this class use this method to release and destroy any
442: * resources that they own.
443: *
444: * This implementation shuts down the LinearProcessors managed by this
445: * JamesSpoolManager
446: *
447: * @throws Exception if an error is encountered during shutdown
448: */
449: public void dispose() {
450: getLogger().info("JamesSpoolManager dispose...");
451: active = false; // shutdown the threads
452: for (Iterator it = spoolThreads.iterator(); it.hasNext();) {
453: ((Thread) it.next()).interrupt(); // interrupt any waiting accept() calls.
454: }
455:
456: long stop = System.currentTimeMillis() + 60000;
457: // give the spooler threads one minute to terminate gracefully
458: while (numActive != 0 && stop > System.currentTimeMillis()) {
459: try {
460: Thread.sleep(1000);
461: } catch (Exception ignored) {
462: }
463: }
464: getLogger()
465: .info("JamesSpoolManager thread shutdown completed.");
466:
467: Iterator it = processors.keySet().iterator();
468: while (it.hasNext()) {
469: String processorName = (String) it.next();
470: if (getLogger().isDebugEnabled()) {
471: getLogger().debug("Processor " + processorName);
472: }
473: LinearProcessor processor = (LinearProcessor) processors
474: .get(processorName);
475: processor.dispose();
476: processors.remove(processor);
477: }
478: }
479:
480: }
|