001: /****************************************************************
002: * Licensed to the Apache Software Foundation (ASF) under one *
003: * or more contributor license agreements. See the NOTICE file *
004: * distributed with this work for additional information *
005: * regarding copyright ownership. The ASF licenses this file *
006: * to you under the Apache License, Version 2.0 (the *
007: * "License"); you may not use this file except in compliance *
008: * with the License. You may obtain a copy of the License at *
009: * *
010: * http://www.apache.org/licenses/LICENSE-2.0 *
011: * *
012: * Unless required by applicable law or agreed to in writing, *
013: * software distributed under the License is distributed on an *
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
015: * KIND, either express or implied. See the License for the *
016: * specific language governing permissions and limitations *
017: * under the License. *
018: ****************************************************************/package org.apache.james.mailrepository;
019:
020: import org.apache.avalon.framework.configuration.Configuration;
021: import org.apache.avalon.framework.configuration.ConfigurationException;
022:
023: import org.apache.james.services.SpoolRepository;
024: import org.apache.mailet.Mail;
025:
026: import java.sql.Connection;
027: import java.sql.PreparedStatement;
028: import java.sql.ResultSet;
029: import java.sql.SQLException;
030: import java.util.LinkedList;
031:
032: /**
033: * Implementation of a SpoolRepository on a database.
034: *
035: * <p>Requires a configuration element in the .conf.xml file of the form:
036: * <br><repository destinationURL="town://path"
037: * <br> type="MAIL"
038: * <br> model="SYNCHRONOUS"/>
039: * <br> <driver>sun.jdbc.odbc.JdbcOdbcDriver</conn>
040: * <br> <conn>jdbc:odbc:LocalDB</conn>
041: * <br> <table>Message</table>
042: * <br></repository>
043: * <p>destinationURL specifies..(Serge??)
044: * <br>Type can be SPOOL or MAIL
045: * <br>Model is currently not used and may be dropped
046: * <br>conn is the location of the ...(Serge)
047: * <br>table is the name of the table in the Database to be used
048: *
049: * <p>Requires a logger called MailRepository.
050: *
051: * <p>Approach for spool manager:
052: *
053: * PendingMessage inner class
054: *
055: * accept() is called....
056: * checks whether needs to load PendingMessages()
057: * tries to get a message()
058: * if none, wait 60
059: *
060: * accept(long) is called
061: * checks whether needs to load PendingMessages
062: * tries to get a message(long)
063: * if none, wait accordingly
064: *
065: * sync checkswhetherneedstoloadPendingMessages()
066: * if pending messages has messages in immediate process, return immediately
067: * if run query in last WAIT_LIMIT time, return immediately
068: * query and build 2 vectors of Pending messages.
069: * Ones that need immediate processing
070: * Ones that are delayed. put them in time order
071: * return
072: *
073: * get_a_message()
074: * loop through immediate messages.
075: * - remove top message
076: * - try to lock. if successful, return. otherwise loop.
077: * if nothing, return null
078: *
079: * get_a_message(long)
080: * try get_a_message()
081: * check top message in pending. if ready, then remove, try to lock, return if lock.
082: * return null.
083: *
084: *
085: * @version 1.0.0, 24/04/1999
086: */
087: public class JDBCSpoolRepository extends JDBCMailRepository implements
088: SpoolRepository {
089:
090: /**
091: * How long a thread should sleep when there are no messages to process.
092: */
093: private static int WAIT_LIMIT = 60000;
094: /**
095: * How long we have to wait before reloading the list of pending messages
096: */
097: private static int LOAD_TIME_MININUM = 1000;
098: /**
099: * A queue in memory of messages that need processing
100: */
101: private LinkedList pendingMessages = new LinkedList();
102: /**
103: * When the queue was last read
104: */
105: private long pendingMessagesLoadTime = 0;
106: /**
107: * Maximum size of the pendingMessages queue
108: */
109: private int maxPendingMessages = 0;
110:
111: /**
112: * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
113: */
114: public void configure(Configuration conf)
115: throws ConfigurationException {
116: super .configure(conf);
117: maxPendingMessages = conf.getChild("maxcache")
118: .getValueAsInteger(1000);
119: }
120:
121: /**
122: * Return a message to process. This is a message in the spool that is not locked.
123: */
124: public synchronized Mail accept() throws InterruptedException {
125: return accept(new SpoolRepository.AcceptFilter() {
126: public boolean accept(String _, String __, long ___,
127: String ____) {
128: return true;
129: }
130:
131: public long getWaitTime() {
132: return 0;
133: }
134: });
135: }
136:
137: /**
138: * Return a message that's ready to process. If a message is of type "error"
139: * then check the last updated time, and don't try it until the long 'delay' parameter
140: * milliseconds has passed.
141: */
142: public synchronized Mail accept(final long delay)
143: throws InterruptedException {
144: return accept(new SpoolRepository.AcceptFilter() {
145: long sleepUntil = 0;
146:
147: public boolean accept(String key, String state,
148: long lastUpdated, String errorMessage) {
149: if (Mail.ERROR.equals(state)) {
150: //if it's an error message, test the time
151: long processingTime = delay + lastUpdated;
152: if (processingTime < System.currentTimeMillis()) {
153: //It's time to process
154: return true;
155: } else {
156: //We don't process this, but we want to possibly reduce the amount of time
157: // we sleep so we wake when this message is ready.
158: if (sleepUntil == 0
159: || processingTime < sleepUntil) {
160: sleepUntil = processingTime;
161: }
162: return false;
163: }
164: } else {
165: return true;
166: }
167: }
168:
169: public long getWaitTime() {
170: if (sleepUntil == 0) {
171: // in AvalonSpoolRepository we return 0: why do we change sleepUntil?
172: // sleepUntil = System.currentTimeMillis();
173: return 0;
174: }
175: long waitTime = sleepUntil - System.currentTimeMillis();
176: sleepUntil = 0;
177: return waitTime <= 0 ? 1 : waitTime;
178: }
179:
180: });
181: }
182:
183: /**
184: * Returns an arbitrarily selected mail deposited in this Repository for
185: * which the supplied filter's accept method returns true.
186: * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
187: * based on number of retries if the mail is ready for processing.
188: * If no message is ready the method will block until one is, the amount of time to block is
189: * determined by calling the filters getWaitTime method.
190: *
191: * @return the mail
192: */
193: public synchronized Mail accept(SpoolRepository.AcceptFilter filter)
194: throws InterruptedException {
195: while (!Thread.currentThread().isInterrupted()) {
196: //Loop through until we are either out of pending messages or have a message
197: // that we can lock
198: PendingMessage next = null;
199: while ((next = getNextPendingMessage(filter)) != null
200: && !Thread.currentThread().isInterrupted()) {
201: //Check whether this is time to expire
202:
203: // boolean shouldProcess = filter.accept (next.key, next.state, next.lastUpdated, next.errorMessage);
204:
205: if (/*shouldProcess && */lock(next.key)) {
206: try {
207: Mail mail = retrieve(next.key);
208: // Retrieve can return null if the mail is no longer on the spool
209: // (i.e. another thread has gotten to it first).
210: // In this case we simply continue to the next key
211: if (mail == null) {
212: unlock(next.key);
213: continue;
214: }
215: return mail;
216: } catch (javax.mail.MessagingException e) {
217: unlock(next.key);
218: getLogger().error(
219: "Exception during retrieve -- skipping item "
220: + next.key, e);
221: }
222: }
223: }
224: //Nothing to do... sleep!
225: long wait_time = filter.getWaitTime();
226: if (wait_time <= 0) {
227: wait_time = WAIT_LIMIT;
228: }
229: try {
230: wait(wait_time);
231: } catch (InterruptedException ex) {
232: throw ex;
233: }
234: }
235: throw new InterruptedException();
236: }
237:
238: /**
239: * Needs to override this method and reset the time to load to zero.
240: * This will force a reload of the pending messages queue once that
241: * is empty... a message that gets added will sit here until that queue
242: * time has passed and the list is then reloaded.
243: */
244: public void store(Mail mc) throws javax.mail.MessagingException {
245: pendingMessagesLoadTime = 0;
246: super .store(mc);
247: }
248:
249: /**
250: * If not empty, gets the next pending message. Otherwise checks
251: * checks the last time pending messages was loaded and load if
252: * it's been more than 1 second (should be configurable).
253: */
254: private PendingMessage getNextPendingMessage(
255: SpoolRepository.AcceptFilter filter) {
256: synchronized (pendingMessages) {
257: if (pendingMessages.size() == 0
258: && pendingMessagesLoadTime < System
259: .currentTimeMillis()) {
260: // pendingMessagesLoadTime = LOAD_TIME_MININUM + System.currentTimeMillis();
261: loadPendingMessages(filter);
262: pendingMessagesLoadTime = Math.max(
263: filter.getWaitTime(), LOAD_TIME_MININUM)
264: + System.currentTimeMillis();
265: }
266:
267: if (pendingMessages.size() == 0) {
268: return null;
269: } else {
270: return (PendingMessage) pendingMessages.removeFirst();
271: }
272: }
273: }
274:
275: /**
276: * Retrieves the pending messages that are in the database
277: */
278: private void loadPendingMessages(SpoolRepository.AcceptFilter filter) {
279: //Loads a vector with PendingMessage objects
280: synchronized (pendingMessages) {
281: pendingMessages.clear();
282:
283: Connection conn = null;
284: PreparedStatement listMessages = null;
285: ResultSet rsListMessages = null;
286: try {
287: conn = datasource.getConnection();
288: listMessages = conn.prepareStatement(sqlQueries
289: .getSqlString("listMessagesSQL", true));
290: listMessages.setString(1, repositoryName);
291: // Too simplistic. When filtering, we may need to see
292: // more than just maxPendingMessages to load the
293: // cache, so just hope that the driver and server use
294: // cursors properly.
295: // --> listMessages.setMaxRows(maxPendingMessages);
296: rsListMessages = listMessages.executeQuery();
297: // Continue to have it loop through the list of messages until we hit
298: // a possible message, or we retrieve maxPendingMessages messages.
299: // This maxPendingMessages cap is to avoid loading thousands or
300: // hundreds of thousands of messages when the spool is enourmous.
301: while (rsListMessages.next()
302: && pendingMessages.size() < maxPendingMessages
303: && !Thread.currentThread().isInterrupted()) {
304: String key = rsListMessages.getString(1);
305: String state = rsListMessages.getString(2);
306: long lastUpdated = rsListMessages.getTimestamp(3)
307: .getTime();
308: String errorMessage = rsListMessages.getString(4);
309: if (filter.accept(key, state, lastUpdated,
310: errorMessage)) {
311: pendingMessages.add(new PendingMessage(key,
312: state, lastUpdated, errorMessage));
313: }
314: }
315: } catch (SQLException sqle) {
316: //Log it and avoid reloading for a bit
317: getLogger().error("Error retrieving pending messages",
318: sqle);
319: pendingMessagesLoadTime = LOAD_TIME_MININUM * 10
320: + System.currentTimeMillis();
321: } finally {
322: theJDBCUtil.closeJDBCResultSet(rsListMessages);
323: theJDBCUtil.closeJDBCStatement(listMessages);
324: theJDBCUtil.closeJDBCConnection(conn);
325: }
326: }
327: }
328:
329: /**
330: * Simple class to hold basic information about a message in the spool
331: */
332: class PendingMessage {
333: protected String key;
334: protected String state;
335: protected long lastUpdated;
336: protected String errorMessage;
337:
338: public PendingMessage(String key, String state,
339: long lastUpdated, String errorMessage) {
340: this.key = key;
341: this.state = state;
342: this.lastUpdated = lastUpdated;
343: this.errorMessage = errorMessage;
344: }
345: }
346: }
|