001: /****************************************************************
002: * Licensed to the Apache Software Foundation (ASF) under one *
003: * or more contributor license agreements. See the NOTICE file *
004: * distributed with this work for additional information *
005: * regarding copyright ownership. The ASF licenses this file *
006: * to you under the Apache License, Version 2.0 (the *
007: * "License"); you may not use this file except in compliance *
008: * with the License. You may obtain a copy of the License at *
009: * *
010: * http://www.apache.org/licenses/LICENSE-2.0 *
011: * *
012: * Unless required by applicable law or agreed to in writing, *
013: * software distributed under the License is distributed on an *
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
015: * KIND, either express or implied. See the License for the *
016: * specific language governing permissions and limitations *
017: * under the License. *
018: ****************************************************************/package org.apache.james.transport.mailets;
019:
020: import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
021: import org.apache.avalon.excalibur.datasource.DataSourceComponent;
022: import org.apache.avalon.framework.service.ServiceManager;
023: import org.apache.james.Constants;
024: import org.apache.james.util.JDBCBayesianAnalyzer;
025: import org.apache.james.util.JDBCUtil;
026: import org.apache.mailet.GenericMailet;
027: import org.apache.mailet.Mail;
028: import org.apache.mailet.MailAddress;
029: import org.apache.mailet.RFC2822Headers;
030: import org.apache.mailet.dates.RFC822DateFormat;
031:
032: import javax.mail.Message;
033: import javax.mail.MessagingException;
034: import javax.mail.Session;
035: import javax.mail.internet.InternetAddress;
036: import javax.mail.internet.MimeBodyPart;
037: import javax.mail.internet.MimeMessage;
038: import javax.mail.internet.MimeMultipart;
039:
040: import java.io.BufferedReader;
041: import java.io.ByteArrayOutputStream;
042: import java.io.StringReader;
043: import java.sql.Connection;
044: import java.text.DecimalFormat;
045: import java.util.Collection;
046: import java.util.HashSet;
047: import java.util.Iterator;
048: import java.util.Set;
049:
050: /**
051: * <P>Spam detection mailet using bayesian analysis techniques.</P>
052: *
053: * <P>Sets an email message header indicating the
054: * probability that an email message is SPAM.</P>
055: *
056: * <P>Based upon the principals described in:
057: * <a href="http://www.paulgraham.com/spam.html">A Plan For Spam</a>
058: * by Paul Graham.
059: * Extended to Paul Grahams' <a href="http://paulgraham.com/better.html">Better Bayesian Filtering</a>.</P>
060: *
061: * <P>The analysis capabilities are based on token frequencies (the <I>Corpus</I>)
062: * learned through a training process (see {@link BayesianAnalysisFeeder})
063: * and stored in a JDBC database.
064: * After a training session, the Corpus must be rebuilt from the database in order to
065: * acquire the new frequencies.
066: * Every 10 minutes a special thread in this mailet will check if any
067: * change was made to the database by the feeder, and rebuild the corpus if necessary.</p>
068: *
069: * <p>A <CODE>org.apache.james.spam.probability</CODE> mail attribute will be created
070: * containing the computed spam probability as a {@link java.lang.Double}.
071: * The <CODE>headerName</CODE> message header string will be created containing such
072: * probability in floating point representation.</p>
073: *
074: * <P>Sample configuration:</P>
075: * <PRE><CODE>
076: * <mailet match="All" class="BayesianAnalysis">
077: * <repositoryPath>db://maildb</repositoryPath>
078: * <!--
079: * Set this to the header name to add with the spam probability
080: * (default is "X-MessageIsSpamProbability").
081: * -->
082: * <headerName>X-MessageIsSpamProbability</headerName>
083: * <!--
084: * Set this to true if you want to ignore messages coming from local senders
085: * (default is false).
086: * By local sender we mean a return-path with a local server part (server listed
087: * in <servernames> in config.xml).
088: * -->
089: * <ignoreLocalSender>true</ignoreLocalSender>
090: * <!--
091: * Set this to the maximum message size (in bytes) that a message may have
092: * to be considered spam (default is 100000).
093: * -->
094: * <maxSize>100000</maxSize>
095: * </mailet>
096: * </CODE></PRE>
097: *
098: * <P>The probability of being spam is pre-pended to the subject if
099: * it is > 0.1 (10%).</P>
100: *
101: * <P>The required tables are automatically created if not already there (see sqlResources.xml).
102: * The token field in both the ham and spam tables is <B>case sensitive</B>.</P>
103: * @see BayesianAnalysisFeeder
104: * @see org.apache.james.util.BayesianAnalyzer
105: * @see org.apache.james.util.JDBCBayesianAnalyzer
106: * @version CVS $Revision: $ $Date: $
107: * @since 2.3.0
108: */
109:
110: public class BayesianAnalysis extends GenericMailet {
111: /**
112: * The JDBCUtil helper class
113: */
114: private final JDBCUtil theJDBCUtil = new JDBCUtil() {
115: protected void delegatedLog(String logString) {
116: log("BayesianAnalysis: " + logString);
117: }
118: };
119:
120: /**
121: * The JDBCBayesianAnalyzer class that does all the work.
122: */
123: private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
124: protected void delegatedLog(String logString) {
125: log("BayesianAnalysis: " + logString);
126: }
127: };
128:
129: private DataSourceComponent datasource;
130: private String repositoryPath;
131:
132: private static final String MAIL_ATTRIBUTE_NAME = "org.apache.james.spam.probability";
133: private static final String HEADER_NAME = "X-MessageIsSpamProbability";
134: private static final long CORPUS_RELOAD_INTERVAL = 600000;
135: private String headerName;
136: private boolean ignoreLocalSender = false;
137:
138: /** The date format object used to generate RFC 822 compliant date headers. */
139: private RFC822DateFormat rfc822DateFormat = new RFC822DateFormat();
140:
141: /**
142: * Return a string describing this mailet.
143: *
144: * @return a string describing this mailet
145: */
146: public String getMailetInfo() {
147: return "BayesianAnalysis Mailet";
148: }
149:
150: /**
151: * Holds value of property maxSize.
152: */
153: private int maxSize = 100000;
154:
155: /**
156: * Holds value of property lastCorpusLoadTime.
157: */
158: private long lastCorpusLoadTime;
159:
160: /**
161: * Getter for property maxSize.
162: * @return Value of property maxSize.
163: */
164: public int getMaxSize() {
165:
166: return this .maxSize;
167: }
168:
169: /**
170: * Setter for property maxSize.
171: * @param maxSize New value of property maxSize.
172: */
173: public void setMaxSize(int maxSize) {
174:
175: this .maxSize = maxSize;
176: }
177:
178: /**
179: * Getter for property lastCorpusLoadTime.
180: * @return Value of property lastCorpusLoadTime.
181: */
182: public long getLastCorpusLoadTime() {
183:
184: return this .lastCorpusLoadTime;
185: }
186:
187: /**
188: * Sets lastCorpusLoadTime to System.currentTimeMillis().
189: */
190: private void touchLastCorpusLoadTime() {
191:
192: this .lastCorpusLoadTime = System.currentTimeMillis();
193: }
194:
195: /**
196: * Mailet initialization routine.
197: * @throws MessagingException if a problem arises
198: */
199: public void init() throws MessagingException {
200: repositoryPath = getInitParameter("repositoryPath");
201:
202: if (repositoryPath == null) {
203: throw new MessagingException("repositoryPath is null");
204: }
205:
206: headerName = getInitParameter("headerName", HEADER_NAME);
207:
208: ignoreLocalSender = Boolean.valueOf(
209: getInitParameter("ignoreLocalSender")).booleanValue();
210:
211: if (ignoreLocalSender) {
212: log("Will ignore messages coming from local senders");
213: } else {
214: log("Will analyze messages coming from local senders");
215: }
216:
217: String maxSizeParam = getInitParameter("maxSize");
218: if (maxSizeParam != null) {
219: setMaxSize(Integer.parseInt(maxSizeParam));
220: }
221: log("maxSize: " + getMaxSize());
222:
223: initDb();
224:
225: CorpusLoader corpusLoader = new CorpusLoader(this );
226: corpusLoader.setDaemon(true);
227: corpusLoader.start();
228:
229: }
230:
231: private void initDb() throws MessagingException {
232:
233: try {
234: ServiceManager serviceManager = (ServiceManager) getMailetContext()
235: .getAttribute(Constants.AVALON_COMPONENT_MANAGER);
236:
237: // Get the DataSourceSelector block
238: DataSourceSelector datasources = (DataSourceSelector) serviceManager
239: .lookup(DataSourceSelector.ROLE);
240:
241: // Get the data-source required.
242: int stindex = repositoryPath.indexOf("://") + 3;
243:
244: String datasourceName = repositoryPath.substring(stindex);
245:
246: datasource = (DataSourceComponent) datasources
247: .select(datasourceName);
248: } catch (Exception e) {
249: throw new MessagingException("Can't get datasource", e);
250: }
251:
252: try {
253: analyzer.initSqlQueries(datasource.getConnection(),
254: getMailetContext());
255: } catch (Exception e) {
256: throw new MessagingException(
257: "Exception initializing queries", e);
258: }
259:
260: try {
261: loadData(datasource.getConnection());
262: } catch (java.sql.SQLException se) {
263: throw new MessagingException("SQLException loading data",
264: se);
265: }
266: }
267:
268: /**
269: * Scans the mail and determines the spam probability.
270: *
271: * @param mail The Mail message to be scanned.
272: * @throws MessagingException if a problem arises
273: */
274: public void service(Mail mail) throws MessagingException {
275:
276: try {
277: MimeMessage message = mail.getMessage();
278:
279: if (ignoreLocalSender) {
280: // ignore the message if the sender is local
281: if (mail.getSender() != null
282: && getMailetContext().isLocalServer(
283: mail.getSender().getHost())) {
284: return;
285: }
286: }
287:
288: String[] headerArray = message.getHeader(headerName);
289: // ignore the message if already analyzed
290: if (headerArray != null && headerArray.length > 0) {
291: return;
292: }
293:
294: ByteArrayOutputStream baos = new ByteArrayOutputStream();
295:
296: double probability;
297:
298: if (message.getSize() < getMaxSize()) {
299: message.writeTo(baos);
300: probability = analyzer
301: .computeSpamProbability(new BufferedReader(
302: new StringReader(baos.toString())));
303: } else {
304: probability = 0.0;
305: }
306:
307: mail.setAttribute(MAIL_ATTRIBUTE_NAME, new Double(
308: probability));
309: message.setHeader(headerName, Double.toString(probability));
310:
311: DecimalFormat probabilityForm = (DecimalFormat) DecimalFormat
312: .getInstance();
313: probabilityForm.applyPattern("##0.##%");
314: String probabilityString = probabilityForm
315: .format(probability);
316:
317: String senderString;
318: if (mail.getSender() == null) {
319: senderString = "null";
320: } else {
321: senderString = mail.getSender().toString();
322: }
323: if (probability > 0.1) {
324: log(headerName + ": " + probabilityString + "; From: "
325: + senderString + "; Recipient(s): "
326: + getAddressesString(mail.getRecipients()));
327:
328: appendToSubject(message, " [" + probabilityString
329: + (probability > 0.9 ? " SPAM" : " spam") + "]");
330: }
331:
332: saveChanges(message);
333:
334: } catch (Exception e) {
335: log("Exception: " + e.getMessage(), e);
336: throw new MessagingException("Exception thrown", e);
337: }
338: }
339:
340: private void loadData(Connection conn) throws java.sql.SQLException {
341:
342: try {
343: // this is synchronized to avoid concurrent update of the corpus
344: synchronized (JDBCBayesianAnalyzer.DATABASE_LOCK) {
345: analyzer.tokenCountsClear();
346: analyzer.loadHamNSpam(conn);
347: analyzer.buildCorpus();
348: analyzer.tokenCountsClear();
349: }
350:
351: log("BayesianAnalysis Corpus loaded");
352:
353: touchLastCorpusLoadTime();
354:
355: } finally {
356: if (conn != null) {
357: theJDBCUtil.closeJDBCConnection(conn);
358: }
359: }
360:
361: }
362:
363: private String getAddressesString(Collection addresses) {
364: if (addresses == null) {
365: return "null";
366: }
367:
368: Iterator iter = addresses.iterator();
369: StringBuffer sb = new StringBuffer();
370: sb.append('[');
371: for (int i = 0; iter.hasNext(); i++) {
372: sb.append(iter.next());
373: if (i + 1 < addresses.size()) {
374: sb.append(", ");
375: }
376: }
377: sb.append(']');
378: return sb.toString();
379: }
380:
381: private void appendToSubject(MimeMessage message, String toAppend) {
382: try {
383: String subject = message.getSubject();
384:
385: if (subject == null) {
386: message.setSubject(toAppend, "iso-8859-1");
387: } else {
388: message.setSubject(toAppend + " " + subject,
389: "iso-8859-1");
390: }
391: } catch (MessagingException ex) {
392: }
393: }
394:
395: private void sendReplyFromPostmaster(Mail mail, String stringContent)
396: throws MessagingException {
397: try {
398: MailAddress notifier = getMailetContext().getPostmaster();
399:
400: MailAddress senderMailAddress = mail.getSender();
401:
402: MimeMessage message = mail.getMessage();
403: //Create the reply message
404: MimeMessage reply = new MimeMessage(Session
405: .getDefaultInstance(System.getProperties(), null));
406:
407: //Create the list of recipients in the Address[] format
408: InternetAddress[] rcptAddr = new InternetAddress[1];
409: rcptAddr[0] = senderMailAddress.toInternetAddress();
410: reply.setRecipients(Message.RecipientType.TO, rcptAddr);
411:
412: //Set the sender...
413: reply.setFrom(notifier.toInternetAddress());
414:
415: //Create the message body
416: MimeMultipart multipart = new MimeMultipart();
417: //Add message as the first mime body part
418: MimeBodyPart part = new MimeBodyPart();
419: part.setContent(stringContent, "text/plain");
420: part.setHeader(RFC2822Headers.CONTENT_TYPE, "text/plain");
421: multipart.addBodyPart(part);
422:
423: reply.setContent(multipart);
424: reply.setHeader(RFC2822Headers.CONTENT_TYPE, multipart
425: .getContentType());
426:
427: //Create the list of recipients in our MailAddress format
428: Set recipients = new HashSet();
429: recipients.add(senderMailAddress);
430:
431: //Set additional headers
432: if (reply.getHeader(RFC2822Headers.DATE) == null) {
433: reply.setHeader(RFC2822Headers.DATE, rfc822DateFormat
434: .format(new java.util.Date()));
435: }
436: String subject = message.getSubject();
437: if (subject == null) {
438: subject = "";
439: }
440: if (subject.indexOf("Re:") == 0) {
441: reply.setSubject(subject);
442: } else {
443: reply.setSubject("Re:" + subject);
444: }
445: reply.setHeader(RFC2822Headers.IN_REPLY_TO, message
446: .getMessageID());
447:
448: //Send it off...
449: getMailetContext().sendMail(notifier, recipients, reply);
450: } catch (Exception e) {
451: log("Exception found sending reply", e);
452: }
453: }
454:
455: /**
456: * Saves changes resetting the original message id.
457: */
458: private void saveChanges(MimeMessage message)
459: throws MessagingException {
460: String messageId = message.getMessageID();
461: message.saveChanges();
462: if (messageId != null) {
463: message.setHeader(RFC2822Headers.MESSAGE_ID, messageId);
464: }
465: }
466:
467: private static class CorpusLoader extends Thread {
468:
469: private BayesianAnalysis analysis;
470:
471: private CorpusLoader(BayesianAnalysis analysis) {
472: super ("BayesianAnalysis Corpus Loader");
473: this .analysis = analysis;
474: }
475:
476: /** Thread entry point.
477: */
478: public void run() {
479: analysis
480: .log("CorpusLoader thread started: will wake up every "
481: + CORPUS_RELOAD_INTERVAL + " ms");
482:
483: try {
484: Thread.sleep(CORPUS_RELOAD_INTERVAL);
485:
486: while (true) {
487: if (analysis.getLastCorpusLoadTime() < JDBCBayesianAnalyzer
488: .getLastDatabaseUpdateTime()) {
489: analysis.log("Reloading Corpus ...");
490: try {
491: analysis.loadData(analysis.datasource
492: .getConnection());
493: analysis.log("Corpus reloaded");
494: } catch (java.sql.SQLException se) {
495: analysis.log("SQLException: ", se);
496: }
497:
498: }
499:
500: if (Thread.interrupted()) {
501: break;
502: }
503: Thread.sleep(CORPUS_RELOAD_INTERVAL);
504: }
505: } catch (InterruptedException ex) {
506: interrupt();
507: }
508: }
509:
510: }
511:
512: }
|