001: /****************************************************************
002: * Licensed to the Apache Software Foundation (ASF) under one *
003: * or more contributor license agreements. See the NOTICE file *
004: * distributed with this work for additional information *
005: * regarding copyright ownership. The ASF licenses this file *
006: * to you under the Apache License, Version 2.0 (the *
007: * "License"); you may not use this file except in compliance *
008: * with the License. You may obtain a copy of the License at *
009: * *
010: * http://www.apache.org/licenses/LICENSE-2.0 *
011: * *
012: * Unless required by applicable law or agreed to in writing, *
013: * software distributed under the License is distributed on an *
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
015: * KIND, either express or implied. See the License for the *
016: * specific language governing permissions and limitations *
017: * under the License. *
018: ****************************************************************/package org.apache.james.transport.mailets;
019:
020: import java.io.BufferedReader;
021: import java.io.StringReader;
022: import java.io.ByteArrayOutputStream;
023:
024: import java.sql.Connection;
025: import java.util.Enumeration;
026:
027: import javax.mail.internet.MimeMessage;
028: import javax.mail.Header;
029: import javax.mail.MessagingException;
030:
031: import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
032: import org.apache.avalon.excalibur.datasource.DataSourceComponent;
033: import org.apache.avalon.framework.service.ServiceManager;
034: import org.apache.james.Constants;
035: import org.apache.mailet.GenericMailet;
036: import org.apache.mailet.Mail;
037: import org.apache.james.util.JDBCUtil;
038:
039: import org.apache.james.util.JDBCBayesianAnalyzer;
040:
041: /**
042: * <P>Feeds ham OR spam messages to train the {@link BayesianAnalysis} mailet.</P>
043: *
044: * <P>The new token frequencies will be stored in a JDBC database.</P>
045: *
046: * <P>Sample configuration:</P>
047: * <PRE><CODE>
048: * <processor name="root">
049: *
050: * <mailet match="RecipientIs=not.spam@thisdomain.com" class="BayesianAnalysisFeeder">
051: * <repositoryPath> db://maildb </repositoryPath>
052: * <feedType>ham</feedType>
053: * <!--
054: * Set this to the maximum message size (in bytes) that a message may have
055: * to be analyzed (default is 100000).
056: * -->
057: * <maxSize>100000</maxSize>
058: * </mailet>
059: *
060: * <mailet match="RecipientIs=spam@thisdomain.com" class="BayesianAnalysisFeeder">
061: * <repositoryPath> db://maildb </repositoryPath>
062: * <feedType>spam</feedType>
063: * <!--
064: * Set this to the maximum message size (in bytes) that a message may have
065: * to be analyzed (default is 100000).
066: * -->
067: * <maxSize>100000</maxSize>
068: * </mailet>
069: *
070: * <processor>
071: * </CODE></PRE>
072: *
073: * <P>The previous example will allow the user to send messages to the server
074: * and use the recipient email address as the indicator for whether the message
075: * is ham or spam.</P>
076: *
077: * <P>Using the example above, send good messages (ham not spam) to the email
078: * address "not.spam@thisdomain.com" to pump good messages into the feeder,
079: * and send spam messages (spam not ham) to the email
080: * address "spam@thisdomain.com" to pump spam messages into the feeder.</P>
081: *
082: * <p>The bayesian database tables will be updated during the training reflecting
083: * the new data</p>
084: *
085: * <P>At the end the mail will be destroyed (ghosted).</P>
086: *
087: * <P><B>The correct approach is to send the original ham/spam message as an attachment
088: * to another message sent to the feeder; all the headers of the enveloping message
089: * will be removed and only the original message's tokens will be analyzed.</B></P>
090: *
091: * <p>After a training session, the frequency <i>Corpus</i> used by <CODE>BayesianAnalysis</CODE>
092: * must be rebuilt from the database, in order to take advantage of the new token frequencies.
093: * Every 10 minutes a special thread in the <CODE>BayesianAnalysis</CODE> mailet will check if any
094: * change was made to the database, and rebuild the corpus if necessary.</p>
095: *
096: * <p>Only one message at a time is scanned (the database update activity is <I>synchronized</I>)
097: * in order to avoid too much database locking,
098: * as thousands of rows may be updated just for one message fed.</p>
099: * @see BayesianAnalysis
100: * @see org.apache.james.util.BayesianAnalyzer
101: * @see org.apache.james.util.JDBCBayesianAnalyzer
102: * @version CVS $Revision: $ $Date: $
103: * @since 2.3.0
104: */
105:
106: public class BayesianAnalysisFeeder extends GenericMailet {
107: /**
108: * The JDBCUtil helper class
109: */
110: private final JDBCUtil theJDBCUtil = new JDBCUtil() {
111: protected void delegatedLog(String logString) {
112: log("BayesianAnalysisFeeder: " + logString);
113: }
114: };
115:
116: /**
117: * The JDBCBayesianAnalyzer class that does all the work.
118: */
119: private JDBCBayesianAnalyzer analyzer = new JDBCBayesianAnalyzer() {
120: protected void delegatedLog(String logString) {
121: log("BayesianAnalysisFeeder: " + logString);
122: }
123: };
124:
125: private DataSourceComponent datasource;
126: private String repositoryPath;
127:
128: private String feedType;
129:
130: /**
131: * Return a string describing this mailet.
132: *
133: * @return a string describing this mailet
134: */
135: public String getMailetInfo() {
136: return "BayesianAnalysisFeeder Mailet";
137: }
138:
139: /**
140: * Holds value of property maxSize.
141: */
142: private int maxSize = 100000;
143:
144: /**
145: * Getter for property maxSize.
146: * @return Value of property maxSize.
147: */
148: public int getMaxSize() {
149:
150: return this .maxSize;
151: }
152:
153: /**
154: * Setter for property maxSize.
155: * @param maxSize New value of property maxSize.
156: */
157: public void setMaxSize(int maxSize) {
158:
159: this .maxSize = maxSize;
160: }
161:
162: /**
163: * Mailet initialization routine.
164: * @throws MessagingException if a problem arises
165: */
166: public void init() throws MessagingException {
167: repositoryPath = getInitParameter("repositoryPath");
168:
169: if (repositoryPath == null) {
170: throw new MessagingException("repositoryPath is null");
171: }
172:
173: feedType = getInitParameter("feedType");
174: if (feedType == null) {
175: throw new MessagingException("feedType is null");
176: }
177:
178: String maxSizeParam = getInitParameter("maxSize");
179: if (maxSizeParam != null) {
180: setMaxSize(Integer.parseInt(maxSizeParam));
181: }
182: log("maxSize: " + getMaxSize());
183:
184: initDb();
185:
186: }
187:
188: private void initDb() throws MessagingException {
189:
190: try {
191: ServiceManager serviceManager = (ServiceManager) getMailetContext()
192: .getAttribute(Constants.AVALON_COMPONENT_MANAGER);
193:
194: // Get the DataSourceSelector block
195: DataSourceSelector datasources = (DataSourceSelector) serviceManager
196: .lookup(DataSourceSelector.ROLE);
197:
198: // Get the data-source required.
199: int stindex = repositoryPath.indexOf("://") + 3;
200:
201: String datasourceName = repositoryPath.substring(stindex);
202:
203: datasource = (DataSourceComponent) datasources
204: .select(datasourceName);
205: } catch (Exception e) {
206: throw new MessagingException("Can't get datasource", e);
207: }
208:
209: try {
210: analyzer.initSqlQueries(datasource.getConnection(),
211: getMailetContext());
212: } catch (Exception e) {
213: throw new MessagingException(
214: "Exception initializing queries", e);
215: }
216:
217: }
218:
219: /**
220: * Scans the mail and updates the token frequencies in the database.
221: *
222: * The method is synchronized in order to avoid too much database locking,
223: * as thousands of rows may be updated just for one message fed.
224: *
225: * @param mail The Mail message to be scanned.
226: */
227: public void service(Mail mail) {
228: boolean dbUpdated = false;
229:
230: mail.setState(Mail.GHOST);
231:
232: ByteArrayOutputStream baos = new ByteArrayOutputStream();
233:
234: Connection conn = null;
235:
236: try {
237:
238: MimeMessage message = mail.getMessage();
239:
240: String messageId = message.getMessageID();
241:
242: if (message.getSize() > getMaxSize()) {
243: log(messageId
244: + " Feeding HAM/SPAM ignored because message size > "
245: + getMaxSize() + ": " + message.getSize());
246: return;
247: }
248:
249: clearAllHeaders(message);
250:
251: message.writeTo(baos);
252:
253: BufferedReader br = new BufferedReader(new StringReader(
254: baos.toString()));
255:
256: // this is synchronized to avoid concurrent update of the corpus
257: synchronized (JDBCBayesianAnalyzer.DATABASE_LOCK) {
258:
259: conn = datasource.getConnection();
260:
261: if (conn.getAutoCommit()) {
262: conn.setAutoCommit(false);
263: }
264:
265: dbUpdated = true;
266:
267: //Clear out any existing word/counts etc..
268: analyzer.clear();
269:
270: if ("ham".equalsIgnoreCase(feedType)) {
271: log(messageId + " Feeding HAM");
272: //Process the stream as ham (not spam).
273: analyzer.addHam(br);
274:
275: //Update storage statistics.
276: analyzer.updateHamTokens(conn);
277: } else {
278: log(messageId + " Feeding SPAM");
279: //Process the stream as spam.
280: analyzer.addSpam(br);
281:
282: //Update storage statistics.
283: analyzer.updateSpamTokens(conn);
284: }
285:
286: //Commit our changes if necessary.
287: if (conn != null && dbUpdated && !conn.getAutoCommit()) {
288: conn.commit();
289: dbUpdated = false;
290: log(messageId + " Training ended successfully");
291: JDBCBayesianAnalyzer.touchLastDatabaseUpdateTime();
292: }
293:
294: }
295:
296: } catch (java.sql.SQLException se) {
297: log("SQLException: " + se.getMessage());
298: } catch (java.io.IOException ioe) {
299: log("IOException: " + ioe.getMessage());
300: } catch (javax.mail.MessagingException me) {
301: log("MessagingException: " + me.getMessage());
302: } finally {
303: //Rollback our changes if necessary.
304: try {
305: if (conn != null && dbUpdated && !conn.getAutoCommit()) {
306: conn.rollback();
307: dbUpdated = false;
308: }
309: } catch (Exception e) {
310: }
311: theJDBCUtil.closeJDBCConnection(conn);
312: }
313: }
314:
315: private void clearAllHeaders(MimeMessage message)
316: throws javax.mail.MessagingException {
317: Enumeration headers = message.getAllHeaders();
318:
319: while (headers.hasMoreElements()) {
320: Header header = (Header) headers.nextElement();
321: try {
322: message.removeHeader(header.getName());
323: } catch (javax.mail.MessagingException me) {
324: }
325: }
326: message.saveChanges();
327: }
328:
329: }
|