001: /*
002: *
003: * Jsmtpd, Java SMTP daemon
004: * Copyright (C) 2005 Jean-Francois POUX, jf.poux@laposte.net
005: *
006: * This program is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU General Public License
008: * as published by the Free Software Foundation; either version 2
009: * of the License, or (at your option) any later version.
010: *
011: * This program is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
014: * GNU General Public License for more details.
015: *
016: * You should have received a copy of the GNU General Public License
017: * along with this program; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
019: *
020: */
021: package org.jsmtpd.core.send;
022:
023: import java.io.File;
024: import java.io.FileFilter;
025: import java.io.IOException;
026: import java.util.Collections;
027: import java.util.Iterator;
028: import java.util.LinkedList;
029: import java.util.List;
030:
031: import org.apache.commons.logging.Log;
032: import org.apache.commons.logging.LogFactory;
033: import org.jsmtpd.config.ReadConfig;
034: import org.jsmtpd.core.mail.Email;
035:
036: /**
037: * queueMail holds a number of pending mails loaded (for perfs reasons)
038: * others are stored on disk<br>
039: *
040: * when a mail has to be requeued, it is directly writen to disk
041: *
042: * @author Jean-Francois POUX
043: */
044: public class QueueService {
045:
046: /**
047: * mail here will be kept loaded
048: */
049: private List<Email> directService = Collections
050: .synchronizedList(new LinkedList<Email>());
051: private static QueueService instance = null;
052: private String tempPath;
053: private File retryDir;
054: private File pendingDir;
055: private Log log = LogFactory.getLog(QueueService.class);
056: private boolean running = true;
057: private long retryDelay;
058: private long currentDiskSize = 0;
059: private long maxDiskSize = 0;
060: // Optim
061:
062: private int fastServ;
063:
064: private boolean safeMode = false;
065: private String safeModePath = "";
066: private FileFilter filter = new EmailFileFilter();
067:
068: public static synchronized QueueService getInstance() {
069: if (instance == null)
070: instance = new QueueService();
071: return instance;
072: }
073:
074: /**
075: * Adds a mail on the queue, in ram if the server can afford it, or written on disk
076: * @param input the mail to queue for delivery
077: * @return false if it can't be queued
078: */
079: public synchronized boolean queueMail(Email input) {
080: if (running == false)
081: return false;
082: log.debug("Queuing new mail " + input.getDiskName()
083: + ", Disk usage " + getStorageStats());
084: // if safe mode, write down the mail to specified location.
085: if (safeMode) {
086: try {
087: Email.save(safeModePath + "/" + input.getDiskName(),
088: input);
089: log.info("SafeMode on : written mail to "
090: + safeModePath + "/" + input.getDiskName());
091: } catch (IOException e) {
092: log.error("SafeMode on : can't write mail to "
093: + safeModePath + "/" + input.getDiskName(), e);
094: }
095: }
096: // We keep big mails on disk, whereas small are kept in ram
097: if ((directService.size() < 20) && (input.getSize() < 512000)) {
098: directService.add(input);
099: return true;
100: } else {
101: if ((directService.size() < 5)
102: && (input.getSize() > 512000)) { // If we have a low number of mail on queue, keep big ones anyway
103: directService.add(input);
104: return true;
105: } else {
106: if ((currentDiskSize + input.getSize()) > maxDiskSize) {
107: log
108: .warn("Can't store anymore incomming mails, storage size exceeded");
109: return false;
110: }
111: try {
112: Email.save(tempPath + "mqueue/pending/"
113: + input.getDiskName(), input);
114: File tmp = new File(tempPath + "mqueue/pending/"
115: + input.getDiskName());
116: currentDiskSize += tmp.length();
117: return true;
118: } catch (IOException e) {
119: }
120: }
121:
122: return false;
123: }
124: }
125:
126: /**
127: * If a mail temporary fails, this method will store it for later delivery
128: * @param input the mail to requeue
129: * @return false if can't be queued
130: */
131: public synchronized boolean requeueMail(Email input) {
132: if (running == false)
133: return false;
134: log.debug("DSVC> Re-queuing new mail " + input.getDiskName()
135: + ", Disk usage " + getStorageStats());
136: try {
137: input.increaseAttempts();
138: Email.save(
139: tempPath + "mqueue/retry/" + input.getDiskName(),
140: input);
141: File tmp = new File(tempPath + "mqueue/retry/"
142: + input.getDiskName());
143: currentDiskSize += tmp.length();
144: return true;
145: } catch (IOException e) {
146:
147: }
148: return false;
149: }
150:
151: /**
152: * Gets a mail for delivering<br>
153: * Mail is picked on retried mails, then from the RAM linked list, and from the pending queue on disk
154: * @return the email instance picked from the queues
155: */
156: public synchronized Email getEmail() {
157: /**
158: * pickRetry();
159: * pickDirect();
160: * pickPending();
161: */
162: if (running == false)
163: return null;
164: Email tmp = null;
165: tmp = pickRetry();
166: if (tmp != null)
167: return tmp;
168:
169: if (directService.size() > 0) {
170: tmp = (Email) directService.remove(0);
171: return tmp;
172: }
173: tmp = pickPending();
174: if (tmp != null)
175: return tmp;
176:
177: return null;
178: }
179:
180: /**
181: * picks a mail in the pending mqueue directory
182: * @return instance of a mail, or null if queue empty
183: */
184: private Email pickPending() {
185:
186: File[] pendingMails = pendingDir.listFiles(filter);
187:
188: if ((pendingMails != null) && pendingMails.length != 0) {
189: try {
190: Email ret = Email.load(pendingMails[0].toString());
191: currentDiskSize -= pendingMails[0].length();
192: pendingMails[0].delete();
193: return ret;
194: } catch (IOException e) {
195: File tmp = new File(tempPath + "/"
196: + pendingMails[0].getName() + "-Bogus");
197: pendingMails[0].renameTo(tmp);
198: log.error("Cant load mail "
199: + pendingMails[0].toString() + ", error: " + e);
200: log.error("Mail moved to bogus");
201: }
202: }
203: return null;
204: }
205:
206: /**
207: * pick a mail in the retry mqueue folder
208: * @return the instance or null if queue is empty
209: */
210: private Email pickRetry() {
211: File[] pendingMails = retryDir.listFiles(filter);
212:
213: if (pendingMails != null) {
214: for (int i = 0; i < pendingMails.length; i++) {
215: long timeOffset = pendingMails[i].lastModified()
216: + retryDelay;
217: if (timeOffset < System.currentTimeMillis()) {
218: try {
219: Email ret = Email.load(pendingMails[0]
220: .toString());
221: currentDiskSize -= pendingMails[0].length();
222: pendingMails[0].delete();
223: return ret;
224: } catch (IOException e) {
225: File tmp = new File(tempPath + "/"
226: + pendingMails[0].getName() + "-Bogus");
227: pendingMails[0].renameTo(tmp);
228: log.error("Cant load mail "
229: + pendingMails[0].toString()
230: + ", error: " + e);
231: log.error("Mail moved to bogus");
232: }
233: }
234: }
235:
236: }
237: return null;
238: }
239:
240: private QueueService() {
241: ReadConfig cfg = ReadConfig.getInstance();
242: tempPath = cfg.getTempPath();
243: retryDelay = cfg.getDelayRetry() * 60 * 1000;
244: retryDir = new File(tempPath + "/mqueue/retry");
245: pendingDir = new File(tempPath + "/mqueue/pending");
246: fastServ = cfg.getDMaxInstances() * 2;
247: log.debug("Buffer set to " + fastServ
248: + " messages max for immediate processing");
249: maxDiskSize = cfg.getMaxTemporarySize() * 1048576;
250: initDiskCount(new File(tempPath));
251: log.debug("Storage usage " + getStorageStats());
252: safeMode = cfg.getSafeMode();
253: if (safeMode)
254: log.info("Safe Mode on");
255: safeModePath = cfg.getSafeModePath();
256: }
257:
258: private String getStorageStats() {
259: String out = Math.round((float) currentDiskSize / 1048576)
260: + "/" + Math.round((float) maxDiskSize / 1048576)
261: + " (in Mo)";
262: return out;
263: }
264:
265: private void initDiskCount(File input) {
266: if (input.isFile())
267: currentDiskSize += input.length();
268: else {
269: File[] sub = input.listFiles();
270: for (int i = 0; i < sub.length; i++) {
271: File file = sub[i];
272: initDiskCount(file);
273: }
274: }
275: }
276:
277: public void shutdownService() {
278: running = false;
279: for (Iterator iter = directService.iterator(); iter.hasNext();) {
280: Email element = (Email) iter.next();
281: try {
282: Email.save(tempPath + "mqueue/retry/"
283: + element.getDiskName(), element);
284: log.debug("Shutdown : " + element.getDiskName()
285: + " written to retry");
286: } catch (IOException e) {
287: log.debug("Cant save mail on shutdown, mail "
288: + element.getDiskName() + " is lost due to: "
289: + e.getCause());
290: }
291: }
292: }
293:
294: }
|