001: /*
002: * Copyright 2001-2007 Uwyn bvba/sprl <info[remove] at uwyn dot com>
003: * Distributed under the terms of the GNU General Public License, v2 or later
004: * $Id: DatabaseMailQueueExecutor.java 3634 2007-01-08 21:42:24Z gbevin $
005: */
006: package com.uwyn.rife.mail.executors;
007:
008: import com.uwyn.rife.database.Datasource;
009: import com.uwyn.rife.database.Datasources;
010: import com.uwyn.rife.database.DbBeanFetcher;
011: import com.uwyn.rife.database.DbQueryManager;
012: import com.uwyn.rife.database.queries.Update;
013: import com.uwyn.rife.database.querymanagers.generic.GenericQueryManager;
014: import com.uwyn.rife.database.querymanagers.generic.GenericQueryManagerFactory;
015: import com.uwyn.rife.mail.Email;
016: import com.uwyn.rife.mail.MailQueueExecutor;
017: import com.uwyn.rife.scheduler.Executor;
018: import com.uwyn.rife.scheduler.Task;
019: import com.uwyn.rife.scheduler.exceptions.SchedulerException;
020: import com.uwyn.rife.tools.Base64;
021: import com.uwyn.rife.tools.ExceptionUtils;
022: import com.uwyn.rife.tools.StringUtils;
023: import java.io.ByteArrayInputStream;
024: import java.util.Collection;
025: import java.util.Date;
026: import java.util.Properties;
027: import java.util.logging.Logger;
028: import javax.mail.Message;
029: import javax.mail.MessagingException;
030: import javax.mail.Session;
031: import javax.mail.Transport;
032: import javax.mail.internet.InternetAddress;
033: import javax.mail.internet.MimeMessage;
034:
035: public class DatabaseMailQueueExecutor extends Executor implements
036: MailQueueExecutor {
037: private final static Object RUNNING_MONITOR = new Object();
038:
039: private static boolean sRunning = false;
040:
041: private String mMailqueueName = "";
042:
043: public boolean isDeliveryAllowed(Email email) {
044: return true;
045: }
046:
047: public boolean executeTask(Task task) {
048: // obtain the mail queue name
049: try {
050: mMailqueueName = task.getTaskoptionValue("name");
051: } catch (SchedulerException e) {
052: mMailqueueName = null;
053: }
054: if (null == mMailqueueName) {
055: mMailqueueName = "";
056: } else {
057: mMailqueueName += " : ";
058: }
059:
060: Logger.getLogger("com.uwyn.rife.mail").finest(
061: mMailqueueName + "Running MailQueueExecutor.");
062:
063: // obtain the datasource name
064: String datasource_name = null;
065: try {
066: datasource_name = task.getTaskoptionValue("datasource");
067: } catch (SchedulerException e) {
068: Logger
069: .getLogger("com.uwyn.rife.mail")
070: .severe(
071: mMailqueueName
072: + "Unexpected error while obtaining the MailQueueExecutor's 'datasource' task option value.\n"
073: + ExceptionUtils
074: .getExceptionStackTrace(e));
075: }
076: if (null == datasource_name) {
077: Logger
078: .getLogger("com.uwyn.rife.mail")
079: .severe(
080: mMailqueueName
081: + "Missing 'datasource' task option for the MailQueueExecutor.");
082: return false;
083: }
084:
085: // obtain the smtp server name
086: String smtp_server = null;
087: try {
088: smtp_server = task.getTaskoptionValue("smtp_server");
089: } catch (SchedulerException e) {
090: Logger
091: .getLogger("com.uwyn.rife.mail")
092: .severe(
093: mMailqueueName
094: + "Unexpected error while obtaining the MailQueueExecutor's 'smtp_server' task option value.\n"
095: + ExceptionUtils
096: .getExceptionStackTrace(e));
097: return false;
098: }
099:
100: // obtain the smtp ssl
101: boolean smtp_ssl = false;
102: try {
103: smtp_ssl = StringUtils.convertToBoolean(task
104: .getTaskoptionValue("smtp_ssl"));
105: } catch (SchedulerException e) {
106: Logger
107: .getLogger("com.uwyn.rife.mail")
108: .severe(
109: mMailqueueName
110: + "Unexpected error while obtaining the MailQueueExecutor's 'smtp_ssl' task option value.\n"
111: + ExceptionUtils
112: .getExceptionStackTrace(e));
113: }
114:
115: // obtain the smtp server port
116: String smtp_port = null;
117: try {
118: smtp_port = task.getTaskoptionValue("smtp_port");
119: } catch (SchedulerException e) {
120: Logger
121: .getLogger("com.uwyn.rife.mail")
122: .severe(
123: mMailqueueName
124: + "Unexpected error while obtaining the MailQueueExecutor's 'smtp_port' task option value, using the default value.\n"
125: + ExceptionUtils
126: .getExceptionStackTrace(e));
127: }
128:
129: if (null == smtp_port) {
130: if (smtp_ssl) {
131: smtp_port = "465";
132: } else {
133: smtp_port = "25";
134: }
135: }
136:
137: // obtain the smtp username
138: // obtain the smtp password
139: // obtained together, because they must both be provided to be valid
140: String smtp_username = null;
141: String smtp_password = null;
142: try {
143: smtp_username = task.getTaskoptionValue("smtp_username");
144: } catch (SchedulerException e) {
145: Logger
146: .getLogger("com.uwyn.rife.mail")
147: .severe(
148: mMailqueueName
149: + "Unexpected error while obtaining the MailQueueExecutor's 'smtp_username' task option value.\n"
150: + ExceptionUtils
151: .getExceptionStackTrace(e));
152: return false;
153: }
154: try {
155: smtp_password = task.getTaskoptionValue("smtp_password");
156: } catch (SchedulerException e) {
157: Logger
158: .getLogger("com.uwyn.rife.mail")
159: .severe(
160: mMailqueueName
161: + "Unexpected error while obtaining the MailQueueExecutor's 'smtp_password' task option value.\n"
162: + ExceptionUtils
163: .getExceptionStackTrace(e));
164: return false;
165: }
166: if ((smtp_username != null || smtp_password != null)
167: && (null == smtp_username || null == smtp_password)) {
168: Logger
169: .getLogger("com.uwyn.rife.mail")
170: .severe(
171: mMailqueueName
172: + "Unexpected error while obtaining the MailQueueExecutor's 'smtp_username' and 'smtp_password' task option values, they must be provided together.\n");
173: return false;
174: }
175:
176: Datasource datasource = Datasources.getRepInstance()
177: .getDatasource(datasource_name);
178: if (null == datasource) {
179: Logger
180: .getLogger("com.uwyn.rife.mail")
181: .severe(
182: mMailqueueName
183: + "Unexpected error: the MailQueueExecutor's datasource '"
184: + datasource
185: + "' could not be found.");
186: return false;
187: }
188:
189: String smtp_from = null;
190: try {
191: smtp_from = task.getTaskoptionValue("smtp_from");
192: } catch (SchedulerException e) {
193: Logger
194: .getLogger("com.uwyn.rife.mail")
195: .severe(
196: mMailqueueName
197: + "Unexpected error while obtaining the MailQueueExecutor's 'smtp_from' task option value.\n"
198: + ExceptionUtils
199: .getExceptionStackTrace(e));
200: return false;
201: }
202:
203: GenericQueryManager<Email> emails = GenericQueryManagerFactory
204: .getInstance(datasource, Email.class);
205: synchronized (RUNNING_MONITOR) {
206: if (!sRunning) {
207: Logger
208: .getLogger("com.uwyn.rife.mail")
209: .finest(
210: mMailqueueName
211: + "MailQueueExecutor not currently running, starting a new one.");
212:
213: // not currrently running, start a new run
214: sRunning = true;
215:
216: try {
217: // flag messages for sending
218: Update flag_messages = new Update(datasource)
219: .table(emails.getTable()).field(
220: "queueFlag", true);
221: new DbQueryManager(datasource)
222: .executeUpdate(flag_messages);
223: Logger
224: .getLogger("com.uwyn.rife.mail")
225: .finest(
226: mMailqueueName
227: + "MailQueueExecutor's flags set, starting processing.");
228:
229: Session session = null;
230: // connect to the SMTP server
231: if (smtp_server != null && smtp_server.length() > 0) {
232: Properties props = new Properties();
233: props.put("mail.transport", "smtp");
234: props.put("mail.host", smtp_server);
235: props.put("mail.port", smtp_port);
236: props.put("mail.smtp.host", smtp_server);
237: props.put("mail.smtp.port", smtp_port);
238:
239: if (smtp_username != null
240: && smtp_password != null) {
241: if (smtp_ssl) {
242: props.put("mail.smtps.auth", "true");
243: } else {
244: props.put("mail.smtp.auth", "true");
245: }
246: }
247:
248: if (smtp_from != null) {
249: props.put("mail.smtp.from", smtp_from);
250: }
251:
252: session = Session.getDefaultInstance(props,
253: null);
254: }
255:
256: // send the flagged messages
257: SendEmails send_emails = new SendEmails(datasource,
258: session, emails, smtp_server, Integer
259: .parseInt(smtp_port),
260: smtp_username, smtp_password, smtp_ssl);
261: emails.restore(emails.getRestoreQuery().where(
262: "queueFlag", "=", true), send_emails);
263: Logger.getLogger("com.uwyn.rife.mail").info(
264: String.valueOf(mMailqueueName)
265: + send_emails.getCount()
266: + " mails were sent");
267: } finally {
268: sRunning = false;
269: }
270: } else {
271: Logger
272: .getLogger("com.uwyn.rife.mail")
273: .finest(
274: mMailqueueName
275: + "MailQueueExecutor is currently RUNNING, not running another");
276: }
277: }
278:
279: Logger.getLogger("com.uwyn.rife.mail").finest(
280: mMailqueueName + "MailQueueExecutor run finished.");
281:
282: return true;
283: }
284:
285: public String getHandledTasktype() {
286: return "MailQueue";
287: }
288:
289: class SendEmails extends DbBeanFetcher<Email> {
290: private Session mSession = null;
291: private GenericQueryManager<Email> mEmailManager = null;
292: private int mCount = 0;
293: private String mHost = null;
294: private int mPort = 0;
295: private String mUsername = null;
296: private String mPassword = null;
297: private boolean mSsl = false;
298:
299: SendEmails(Datasource datasource, Session session,
300: GenericQueryManager<Email> emails, String host,
301: int port, String username, String password, boolean ssl) {
302: super (datasource, Email.class);
303: mSession = session;
304: mEmailManager = emails;
305: mHost = host;
306: mPort = port;
307: mUsername = username;
308: mPassword = password;
309: mSsl = ssl;
310: }
311:
312: int getCount() {
313: return mCount;
314: }
315:
316: public boolean gotBeanInstance(Email email) {
317: if (null == mSession) {
318: Logger
319: .getLogger("com.uwyn.rife.mail")
320: .warning(
321: mMailqueueName
322: + "Email not sent since no 'smtp_server' task option has not been provided to the MailQueueExecutor.\n"
323: + email.toString());
324: return false;
325: }
326:
327: if (!isDeliveryAllowed(email)) {
328: return false;
329: }
330:
331: MimeMessage message;
332: try {
333: // try to detect if a raw javamail message has been queued
334: if (email.getFromAddress().equals(
335: MimeMessage.class.getName())
336: && email.getToAddresses().equals(
337: MimeMessage.class.getName())
338: && email.getSubject().equals(
339: MimeMessage.class.getName())) {
340: ByteArrayInputStream in = new ByteArrayInputStream(
341: Base64.decode(email.getBody()));
342: message = new MimeMessage(mSession, in);
343: } else {
344: message = new MimeMessage(mSession);
345:
346: // build the mail message
347: message.setFrom(new InternetAddress(email
348: .getFromAddress(), false));
349: message.setSubject(email.getSubject());
350: message.setText(email.getBody(), "UTF-8");
351: message.setSentDate(new Date());
352:
353: int to_count = 0;
354: Collection<String> to_list = StringUtils.split(
355: email.getToAddresses(), ",");
356: InternetAddress[] to_array = new InternetAddress[to_list
357: .size()];
358: for (String to : to_list) {
359: to_array[to_count++] = new InternetAddress(to,
360: false);
361: }
362: message.setRecipients(Message.RecipientType.TO,
363: to_array);
364:
365: int cc_count = 0;
366: Collection<String> cc_list = StringUtils.split(
367: email.getCcAddresses(), ",");
368: InternetAddress[] cc_array = new InternetAddress[cc_list
369: .size()];
370: for (String cc : cc_list) {
371: cc_array[cc_count++] = new InternetAddress(cc,
372: false);
373: }
374: message.setRecipients(Message.RecipientType.CC,
375: cc_array);
376:
377: int bcc_count = 0;
378: Collection<String> bcc_list = StringUtils.split(
379: email.getBccAddresses(), ",");
380: InternetAddress[] bcc_array = new InternetAddress[bcc_list
381: .size()];
382: for (String bcc : bcc_list) {
383: bcc_array[bcc_count++] = new InternetAddress(
384: bcc, false);
385: }
386: message.setRecipients(Message.RecipientType.BCC,
387: bcc_array);
388: }
389: } catch (MessagingException e) {
390: message = null;
391: Logger
392: .getLogger("com.uwyn.rife.mail")
393: .warning(
394: mMailqueueName
395: + "Email not sent due to an error while building the message.\n"
396: + email.toString()
397: + "\n"
398: + ExceptionUtils
399: .getExceptionStackTrace(e));
400: }
401:
402: // send the mail
403: if (message != null) {
404: try {
405: if (mUsername != null && mPassword != null) {
406: Transport transport = mSession
407: .getTransport(mSsl ? "smtps" : "smtp");
408: transport.connect(mHost, mPort, mUsername,
409: mPassword);
410: message.saveChanges();
411: transport.sendMessage(message, message
412: .getAllRecipients());
413: } else {
414: Transport.send(message);
415: }
416:
417: // removed the message
418: mEmailManager.delete(email.getId());
419:
420: mCount++;
421: try {
422: Thread.sleep(300);
423: if (0 == (mCount % 100)) {
424: Thread.sleep(10000);
425: }
426: } catch (InterruptedException e3) {
427: // do nothing
428: }
429: } catch (MessagingException e) {
430: Logger
431: .getLogger("com.uwyn.rife.mail")
432: .severe(
433: mMailqueueName
434: + "Unexpected error while sending the MailQueueExecutor's email message \n"
435: + email.toString()
436: + "\n"
437: + ExceptionUtils
438: .getExceptionStackTrace(e));
439: }
440: }
441:
442: return true;
443: }
444: }
445: }
|