0001: /****************************************************************
0002: * Licensed to the Apache Software Foundation (ASF) under one *
0003: * or more contributor license agreements. See the NOTICE file *
0004: * distributed with this work for additional information *
0005: * regarding copyright ownership. The ASF licenses this file *
0006: * to you under the Apache License, Version 2.0 (the *
0007: * "License"); you may not use this file except in compliance *
0008: * with the License. You may obtain a copy of the License at *
0009: * *
0010: * http://www.apache.org/licenses/LICENSE-2.0 *
0011: * *
0012: * Unless required by applicable law or agreed to in writing, *
0013: * software distributed under the License is distributed on an *
0014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
0015: * KIND, either express or implied. See the License for the *
0016: * specific language governing permissions and limitations *
0017: * under the License. *
0018: ****************************************************************/package org.apache.james.mailrepository;
0019:
0020: import org.apache.avalon.cornerstone.services.datasources.DataSourceSelector;
0021: import org.apache.avalon.cornerstone.services.store.Store;
0022: import org.apache.avalon.cornerstone.services.store.StreamRepository;
0023: import org.apache.avalon.excalibur.datasource.DataSourceComponent;
0024: import org.apache.avalon.framework.activity.Initializable;
0025: import org.apache.avalon.framework.service.Serviceable;
0026: import org.apache.avalon.framework.service.ServiceManager;
0027: import org.apache.avalon.framework.service.ServiceException;
0028: import org.apache.avalon.framework.configuration.Configurable;
0029: import org.apache.avalon.framework.configuration.Configuration;
0030: import org.apache.avalon.framework.configuration.ConfigurationException;
0031: import org.apache.avalon.framework.configuration.DefaultConfiguration;
0032: import org.apache.avalon.framework.context.Context;
0033: import org.apache.avalon.framework.context.ContextException;
0034: import org.apache.avalon.framework.context.Contextualizable;
0035: import org.apache.avalon.framework.logger.AbstractLogEnabled;
0036: import org.apache.james.context.AvalonContextUtilities;
0037: import org.apache.james.core.MailImpl;
0038: import org.apache.james.core.MimeMessageCopyOnWriteProxy;
0039: import org.apache.james.core.MimeMessageWrapper;
0040: import org.apache.james.services.MailRepository;
0041: import org.apache.james.util.JDBCUtil;
0042: import org.apache.james.util.Lock;
0043: import org.apache.james.util.SqlResources;
0044: import org.apache.mailet.Mail;
0045: import org.apache.mailet.MailAddress;
0046:
0047: import javax.mail.MessagingException;
0048: import javax.mail.internet.MimeMessage;
0049:
0050: import java.io.ByteArrayInputStream;
0051: import java.io.ByteArrayOutputStream;
0052: import java.io.File;
0053: import java.io.IOException;
0054: import java.io.ObjectOutputStream;
0055: import java.io.ObjectInputStream;
0056: import java.sql.Blob;
0057: import java.sql.Connection;
0058: import java.sql.DatabaseMetaData;
0059: import java.sql.PreparedStatement;
0060: import java.sql.ResultSet;
0061: import java.sql.SQLException;
0062: import java.sql.Statement;
0063: import java.util.ArrayList;
0064: import java.util.Collection;
0065: import java.util.HashMap;
0066: import java.util.HashSet;
0067: import java.util.Iterator;
0068: import java.util.List;
0069: import java.util.Map;
0070: import java.util.Set;
0071: import java.util.StringTokenizer;
0072:
0073: /**
0074: * Implementation of a MailRepository on a database.
0075: *
0076: * <p>Requires a configuration element in the .conf.xml file of the form:
0077: * <br><repository destinationURL="db://<datasource>/<table_name>/<repository_name>"
0078: * <br> type="MAIL"
0079: * <br> model="SYNCHRONOUS"/>
0080: * <br></repository>
0081: * <p>destinationURL specifies..(Serge??)
0082: * <br>Type can be SPOOL or MAIL
0083: * <br>Model is currently not used and may be dropped
0084: *
0085: * <p>Requires a logger called MailRepository.
0086: *
0087: * @version CVS $Revision: 494012 $ $Date: 2007-01-08 11:23:58 +0100 (Mo, 08 Jan 2007) $
0088: */
0089: public class JDBCMailRepository extends AbstractLogEnabled implements
0090: MailRepository, Contextualizable, Serviceable, Configurable,
0091: Initializable {
0092:
0093: /**
0094: * Whether 'deep debugging' is turned on.
0095: */
0096: private static final boolean DEEP_DEBUG = false;
0097:
0098: /**
0099: * The Avalon componentManager used by the instance
0100: */
0101: private ServiceManager componentManager;
0102:
0103: /**
0104: * The Avalon context used by the instance
0105: */
0106: protected Context context;
0107:
0108: /**
0109: * A lock used to control access to repository elements, locking access
0110: * based on the key
0111: */
0112: private Lock lock;
0113:
0114: /**
0115: * The table name parsed from the destination URL
0116: */
0117: protected String tableName;
0118:
0119: /**
0120: * The repository name parsed from the destination URL
0121: */
0122: protected String repositoryName;
0123:
0124: /**
0125: * The name of the SQL configuration file to be used to configure this repository.
0126: */
0127: private String sqlFileName;
0128:
0129: /**
0130: * The stream repository used in dbfile mode
0131: */
0132: private StreamRepository sr = null;
0133:
0134: /**
0135: * The selector used to obtain the JDBC datasource
0136: */
0137: protected DataSourceSelector datasources;
0138:
0139: /**
0140: * The JDBC datasource that provides the JDBC connection
0141: */
0142: protected DataSourceComponent datasource;
0143:
0144: /**
0145: * The name of the datasource used by this repository
0146: */
0147: protected String datasourceName;
0148:
0149: /**
0150: * Contains all of the sql strings for this component.
0151: */
0152: protected SqlResources sqlQueries;
0153:
0154: /**
0155: * The JDBCUtil helper class
0156: */
0157: protected JDBCUtil theJDBCUtil;
0158:
0159: /**
0160: * "Support for Mail Attributes under JDBC repositories is ready" indicator.
0161: */
0162: protected boolean jdbcMailAttributesReady = false;
0163:
0164: /**
0165: * The size threshold for in memory handling of storing operations
0166: */
0167: private int inMemorySizeLimit;
0168:
0169: /**
0170: * @see org.apache.avalon.framework.context.Contextualizable#contextualize(Context)
0171: */
0172: public void contextualize(final Context context)
0173: throws ContextException {
0174: this .context = context;
0175: }
0176:
0177: /**
0178: * @see org.apache.avalon.framework.service.Servicable#service(ServiceManager)
0179: */
0180: public void service(final ServiceManager componentManager)
0181: throws ServiceException {
0182: StringBuffer logBuffer = null;
0183: if (getLogger().isDebugEnabled()) {
0184: logBuffer = new StringBuffer(64).append(
0185: this .getClass().getName()).append(".compose()");
0186: getLogger().debug(logBuffer.toString());
0187: }
0188: // Get the DataSourceSelector service
0189: datasources = (DataSourceSelector) componentManager
0190: .lookup(DataSourceSelector.ROLE);
0191: this .componentManager = componentManager;
0192:
0193: }
0194:
0195: /**
0196: * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
0197: */
0198: public void configure(Configuration conf)
0199: throws ConfigurationException {
0200: if (getLogger().isDebugEnabled()) {
0201: getLogger().debug(
0202: this .getClass().getName() + ".configure()");
0203: }
0204:
0205: String destination = conf.getAttribute("destinationURL");
0206: // normalize the destination, to simplify processing.
0207: if (!destination.endsWith("/")) {
0208: destination += "/";
0209: }
0210: // Parse the DestinationURL for the name of the datasource,
0211: // the table to use, and the (optional) repository Key.
0212: // Split on "/", starting after "db://"
0213: List urlParams = new ArrayList();
0214: int start = 5;
0215: if (destination.startsWith("dbfile")) {
0216: //this is dbfile:// instead of db://
0217: start += 4;
0218: }
0219: int end = destination.indexOf('/', start);
0220: while (end > -1) {
0221: urlParams.add(destination.substring(start, end));
0222: start = end + 1;
0223: end = destination.indexOf('/', start);
0224: }
0225:
0226: // Build SqlParameters and get datasource name from URL parameters
0227: if (urlParams.size() == 0) {
0228: StringBuffer exceptionBuffer = new StringBuffer(256)
0229: .append(
0230: "Malformed destinationURL - Must be of the format '")
0231: .append(
0232: "db://<data-source>[/<table>[/<repositoryName>]]'. Was passed ")
0233: .append(conf.getAttribute("destinationURL"));
0234: throw new ConfigurationException(exceptionBuffer.toString());
0235: }
0236: if (urlParams.size() >= 1) {
0237: datasourceName = (String) urlParams.get(0);
0238: }
0239: if (urlParams.size() >= 2) {
0240: tableName = (String) urlParams.get(1);
0241: }
0242: if (urlParams.size() >= 3) {
0243: repositoryName = "";
0244: for (int i = 2; i < urlParams.size(); i++) {
0245: if (i >= 3) {
0246: repositoryName += '/';
0247: }
0248: repositoryName += (String) urlParams.get(i);
0249: }
0250: }
0251:
0252: if (getLogger().isDebugEnabled()) {
0253: StringBuffer logBuffer = new StringBuffer(128).append(
0254: "Parsed URL: table = '").append(tableName).append(
0255: "', repositoryName = '").append(repositoryName)
0256: .append("'");
0257: getLogger().debug(logBuffer.toString());
0258: }
0259:
0260: inMemorySizeLimit = conf.getChild("inMemorySizeLimit")
0261: .getValueAsInteger(409600000);
0262:
0263: String filestore = conf.getChild("filestore").getValue(null);
0264: sqlFileName = conf.getChild("sqlFile").getValue();
0265: if (!sqlFileName.startsWith("file://")) {
0266: throw new ConfigurationException(
0267: "Malformed sqlFile - Must be of the format 'file://<filename>'.");
0268: }
0269: try {
0270: if (filestore != null) {
0271: Store store = (Store) componentManager
0272: .lookup(Store.ROLE);
0273: //prepare Configurations for stream repositories
0274: DefaultConfiguration streamConfiguration = new DefaultConfiguration(
0275: "repository",
0276: "generated:JDBCMailRepository.configure()");
0277:
0278: streamConfiguration.setAttribute("destinationURL",
0279: filestore);
0280: streamConfiguration.setAttribute("type", "STREAM");
0281: streamConfiguration
0282: .setAttribute("model", "SYNCHRONOUS");
0283: sr = (StreamRepository) store
0284: .select(streamConfiguration);
0285:
0286: if (getLogger().isDebugEnabled()) {
0287: getLogger().debug(
0288: "Got filestore for JdbcMailRepository: "
0289: + filestore);
0290: }
0291: }
0292:
0293: lock = new Lock();
0294: if (getLogger().isDebugEnabled()) {
0295: StringBuffer logBuffer = new StringBuffer(128).append(
0296: this .getClass().getName()).append(
0297: " created according to ").append(destination);
0298: getLogger().debug(logBuffer.toString());
0299: }
0300: } catch (Exception e) {
0301: final String message = "Failed to retrieve Store component:"
0302: + e.getMessage();
0303: getLogger().error(message, e);
0304: throw new ConfigurationException(message, e);
0305: }
0306: }
0307:
0308: /**
0309: * Initialises the JDBC repository.
0310: * 1) Tests the connection to the database.
0311: * 2) Loads SQL strings from the SQL definition file,
0312: * choosing the appropriate SQL for this connection,
0313: * and performing paramter substitution,
0314: * 3) Initialises the database with the required tables, if necessary.
0315: *
0316: * @throws Exception if an error occurs
0317: */
0318: public void initialize() throws Exception {
0319: StringBuffer logBuffer = null;
0320: if (getLogger().isDebugEnabled()) {
0321: getLogger().debug(
0322: this .getClass().getName() + ".initialize()");
0323: }
0324:
0325: theJDBCUtil = new JDBCUtil() {
0326: protected void delegatedLog(String logString) {
0327: JDBCMailRepository.this .getLogger().warn(
0328: "JDBCMailRepository: " + logString);
0329: }
0330: };
0331: // Get the data-source required.
0332: datasource = (DataSourceComponent) datasources
0333: .select(datasourceName);
0334:
0335: // Test the connection to the database, by getting the DatabaseMetaData.
0336: Connection conn = datasource.getConnection();
0337: PreparedStatement createStatement = null;
0338:
0339: try {
0340: // Initialise the sql strings.
0341:
0342: File sqlFile = null;
0343: try {
0344: sqlFile = AvalonContextUtilities.getFile(context,
0345: sqlFileName);
0346: sqlFileName = null;
0347: } catch (Exception e) {
0348: getLogger().fatalError(e.getMessage(), e);
0349: throw e;
0350: }
0351:
0352: if (getLogger().isDebugEnabled()) {
0353: logBuffer = new StringBuffer(128).append(
0354: "Reading SQL resources from file: ").append(
0355: sqlFile.getAbsolutePath()).append(", section ")
0356: .append(this .getClass().getName()).append(".");
0357: getLogger().debug(logBuffer.toString());
0358: }
0359:
0360: // Build the statement parameters
0361: Map sqlParameters = new HashMap();
0362: if (tableName != null) {
0363: sqlParameters.put("table", tableName);
0364: }
0365: if (repositoryName != null) {
0366: sqlParameters.put("repository", repositoryName);
0367: }
0368:
0369: sqlQueries = new SqlResources();
0370: sqlQueries.init(sqlFile, this .getClass().getName(), conn,
0371: sqlParameters);
0372:
0373: // Check if the required table exists. If not, create it.
0374: DatabaseMetaData dbMetaData = conn.getMetaData();
0375: // Need to ask in the case that identifiers are stored, ask the DatabaseMetaInfo.
0376: // Try UPPER, lower, and MixedCase, to see if the table is there.
0377: if (!(theJDBCUtil.tableExists(dbMetaData, tableName))) {
0378: // Users table doesn't exist - create it.
0379: createStatement = conn.prepareStatement(sqlQueries
0380: .getSqlString("createTable", true));
0381: createStatement.execute();
0382:
0383: if (getLogger().isInfoEnabled()) {
0384: logBuffer = new StringBuffer(64).append(
0385: "JdbcMailRepository: Created table '")
0386: .append(tableName).append("'.");
0387: getLogger().info(logBuffer.toString());
0388: }
0389: }
0390:
0391: checkJdbcAttributesSupport(dbMetaData);
0392:
0393: } finally {
0394: theJDBCUtil.closeJDBCStatement(createStatement);
0395: theJDBCUtil.closeJDBCConnection(conn);
0396: }
0397: }
0398:
0399: /** Checks whether support for JDBC Mail atributes is activated for this repository
0400: * and if everything is consistent.
0401: * Looks for both the "updateMessageAttributesSQL" and "retrieveMessageAttributesSQL"
0402: * statements in sqlResources and for a table column named "message_attributes".
0403: *
0404: * @param dbMetaData the database metadata to be used to look up the column
0405: * @throws SQLException if a fatal situation is met
0406: */
0407: protected void checkJdbcAttributesSupport(
0408: DatabaseMetaData dbMetaData) throws SQLException {
0409: String attributesColumnName = "message_attributes";
0410: boolean hasUpdateMessageAttributesSQL = false;
0411: boolean hasRetrieveMessageAttributesSQL = false;
0412:
0413: boolean hasMessageAttributesColumn = theJDBCUtil.columnExists(
0414: dbMetaData, tableName, attributesColumnName);
0415:
0416: StringBuffer logBuffer = new StringBuffer(64)
0417: .append("JdbcMailRepository '" + repositoryName
0418: + ", table '" + tableName + "': ");
0419:
0420: //Determine whether attributes are used and available for storing
0421: //Do we have updateMessageAttributesSQL?
0422: String updateMessageAttrSql = sqlQueries.getSqlString(
0423: "updateMessageAttributesSQL", false);
0424: if (updateMessageAttrSql != null) {
0425: hasUpdateMessageAttributesSQL = true;
0426: }
0427:
0428: //Determine whether attributes are used and retrieve them
0429: //Do we have retrieveAttributesSQL?
0430: String retrieveMessageAttrSql = sqlQueries.getSqlString(
0431: "retrieveMessageAttributesSQL", false);
0432: if (retrieveMessageAttrSql != null) {
0433: hasRetrieveMessageAttributesSQL = true;
0434: }
0435:
0436: if (hasUpdateMessageAttributesSQL
0437: && !hasRetrieveMessageAttributesSQL) {
0438: logBuffer
0439: .append("JDBC Mail Attributes support was activated for update but not for retrieval"
0440: + "(found 'updateMessageAttributesSQL' but not 'retrieveMessageAttributesSQL'"
0441: + "in table '" + tableName + "').");
0442: getLogger().fatalError(logBuffer.toString());
0443: throw new SQLException(logBuffer.toString());
0444: }
0445: if (!hasUpdateMessageAttributesSQL
0446: && hasRetrieveMessageAttributesSQL) {
0447: logBuffer
0448: .append("JDBC Mail Attributes support was activated for retrieval but not for update"
0449: + "(found 'retrieveMessageAttributesSQL' but not 'updateMessageAttributesSQL'"
0450: + "in table '" + tableName + "'.");
0451: getLogger().fatalError(logBuffer.toString());
0452: throw new SQLException(logBuffer.toString());
0453: }
0454: if (!hasMessageAttributesColumn
0455: && (hasUpdateMessageAttributesSQL || hasRetrieveMessageAttributesSQL)) {
0456: logBuffer
0457: .append("JDBC Mail Attributes support was activated but column '"
0458: + attributesColumnName
0459: + "' is missing in table '"
0460: + tableName
0461: + "'.");
0462: getLogger().fatalError(logBuffer.toString());
0463: throw new SQLException(logBuffer.toString());
0464: }
0465: if (hasUpdateMessageAttributesSQL
0466: && hasRetrieveMessageAttributesSQL) {
0467: jdbcMailAttributesReady = true;
0468: if (getLogger().isInfoEnabled()) {
0469: logBuffer.append("JDBC Mail Attributes support ready.");
0470: getLogger().info(logBuffer.toString());
0471: }
0472: } else {
0473: jdbcMailAttributesReady = false;
0474: logBuffer
0475: .append("JDBC Mail Attributes support not activated. "
0476: + "Missing both 'updateMessageAttributesSQL' "
0477: + "and 'retrieveMessageAttributesSQL' "
0478: + "statements for table '"
0479: + tableName
0480: + "' in sqlResources.xml. "
0481: + "Will not persist in the repository '"
0482: + repositoryName + "'.");
0483: getLogger().warn(logBuffer.toString());
0484: }
0485: }
0486:
0487: /**
0488: * Releases a lock on a message identified by a key
0489: *
0490: * @param key the key of the message to be unlocked
0491: *
0492: * @return true if successfully released the lock, false otherwise
0493: */
0494: public boolean unlock(String key) {
0495: if (lock.unlock(key)) {
0496: if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
0497: StringBuffer debugBuffer = new StringBuffer(256)
0498: .append("Unlocked ").append(key)
0499: .append(" for ").append(
0500: Thread.currentThread().getName())
0501: .append(" @ ").append(
0502: new java.util.Date(System
0503: .currentTimeMillis()));
0504: getLogger().debug(debugBuffer.toString());
0505: }
0506: return true;
0507: } else {
0508: return false;
0509: }
0510: }
0511:
0512: /**
0513: * Obtains a lock on a message identified by a key
0514: *
0515: * @param key the key of the message to be locked
0516: *
0517: * @return true if successfully obtained the lock, false otherwise
0518: */
0519: public boolean lock(String key) {
0520: if (lock.lock(key)) {
0521: if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
0522: StringBuffer debugBuffer = new StringBuffer(256)
0523: .append("Locked ").append(key).append(" for ")
0524: .append(Thread.currentThread().getName())
0525: .append(" @ ").append(
0526: new java.util.Date(System
0527: .currentTimeMillis()));
0528: getLogger().debug(debugBuffer.toString());
0529: }
0530: return true;
0531: } else {
0532: return false;
0533: }
0534: }
0535:
0536: /**
0537: * Store this message to the database. Optionally stores the message
0538: * body to the filesystem and only writes the headers to the database.
0539: */
0540: public void store(Mail mc) throws MessagingException {
0541: Connection conn = null;
0542: boolean wasLocked = true;
0543: String key = mc.getName();
0544: try {
0545: synchronized (this ) {
0546: wasLocked = lock.isLocked(key);
0547:
0548: if (!wasLocked) {
0549: //If it wasn't locked, we want a lock during the store
0550: lock(key);
0551: }
0552: }
0553: conn = datasource.getConnection();
0554:
0555: //Need to determine whether need to insert this record, or update it.
0556:
0557: //Begin a transaction
0558: conn.setAutoCommit(false);
0559:
0560: PreparedStatement checkMessageExists = null;
0561: ResultSet rsExists = null;
0562: boolean exists = false;
0563: try {
0564: checkMessageExists = conn.prepareStatement(sqlQueries
0565: .getSqlString("checkMessageExistsSQL", true));
0566: checkMessageExists.setString(1, mc.getName());
0567: checkMessageExists.setString(2, repositoryName);
0568: rsExists = checkMessageExists.executeQuery();
0569: exists = rsExists.next() && rsExists.getInt(1) > 0;
0570: } finally {
0571: theJDBCUtil.closeJDBCResultSet(rsExists);
0572: theJDBCUtil.closeJDBCStatement(checkMessageExists);
0573: }
0574:
0575: if (exists) {
0576: //Update the existing record
0577: PreparedStatement updateMessage = null;
0578:
0579: try {
0580: updateMessage = conn.prepareStatement(sqlQueries
0581: .getSqlString("updateMessageSQL", true));
0582: updateMessage.setString(1, mc.getState());
0583: updateMessage.setString(2, mc.getErrorMessage());
0584: if (mc.getSender() == null) {
0585: updateMessage
0586: .setNull(3, java.sql.Types.VARCHAR);
0587: } else {
0588: updateMessage.setString(3, mc.getSender()
0589: .toString());
0590: }
0591: StringBuffer recipients = new StringBuffer();
0592: for (Iterator i = mc.getRecipients().iterator(); i
0593: .hasNext();) {
0594: recipients.append(i.next().toString());
0595: if (i.hasNext()) {
0596: recipients.append("\r\n");
0597: }
0598: }
0599: updateMessage.setString(4, recipients.toString());
0600: updateMessage.setString(5, mc.getRemoteHost());
0601: updateMessage.setString(6, mc.getRemoteAddr());
0602: updateMessage.setTimestamp(7,
0603: new java.sql.Timestamp(mc.getLastUpdated()
0604: .getTime()));
0605: updateMessage.setString(8, mc.getName());
0606: updateMessage.setString(9, repositoryName);
0607: updateMessage.execute();
0608: } finally {
0609: Statement localUpdateMessage = updateMessage;
0610: // Clear reference to statement
0611: updateMessage = null;
0612: theJDBCUtil.closeJDBCStatement(localUpdateMessage);
0613: }
0614:
0615: //Determine whether attributes are used and available for storing
0616: if (jdbcMailAttributesReady && mc.hasAttributes()) {
0617: String updateMessageAttrSql = sqlQueries
0618: .getSqlString("updateMessageAttributesSQL",
0619: false);
0620: PreparedStatement updateMessageAttr = null;
0621: try {
0622: updateMessageAttr = conn
0623: .prepareStatement(updateMessageAttrSql);
0624: ByteArrayOutputStream baos = new ByteArrayOutputStream();
0625: ObjectOutputStream oos = new ObjectOutputStream(
0626: baos);
0627: try {
0628: if (mc instanceof MailImpl) {
0629: oos.writeObject(((MailImpl) mc)
0630: .getAttributesRaw());
0631: } else {
0632: HashMap temp = new HashMap();
0633: for (Iterator i = mc
0634: .getAttributeNames(); i
0635: .hasNext();) {
0636: String hashKey = (String) i.next();
0637: temp.put(hashKey, mc
0638: .getAttribute(hashKey));
0639: }
0640: oos.writeObject(temp);
0641: }
0642: oos.flush();
0643: ByteArrayInputStream attrInputStream = new ByteArrayInputStream(
0644: baos.toByteArray());
0645: updateMessageAttr.setBinaryStream(1,
0646: attrInputStream, baos.size());
0647: } finally {
0648: try {
0649: if (oos != null) {
0650: oos.close();
0651: }
0652: } catch (IOException ioe) {
0653: getLogger()
0654: .debug(
0655: "JDBCMailRepository: Unexpected exception while closing output stream.",
0656: ioe);
0657: }
0658: }
0659: updateMessageAttr.setString(2, mc.getName());
0660: updateMessageAttr.setString(3, repositoryName);
0661: updateMessageAttr.execute();
0662: } catch (SQLException sqle) {
0663: getLogger()
0664: .info(
0665: "JDBCMailRepository: Trying to update mail attributes failed.",
0666: sqle);
0667:
0668: } finally {
0669: theJDBCUtil
0670: .closeJDBCStatement(updateMessageAttr);
0671: }
0672: }
0673:
0674: //Determine whether the message body has changed, and possibly avoid
0675: // updating the database.
0676: MimeMessage messageBody = mc.getMessage();
0677: boolean saveBody = false;
0678: // if the message is a CopyOnWrite proxy we check the modified wrapped object.
0679: if (messageBody instanceof MimeMessageCopyOnWriteProxy) {
0680: MimeMessageCopyOnWriteProxy messageCow = (MimeMessageCopyOnWriteProxy) messageBody;
0681: messageBody = messageCow.getWrappedMessage();
0682: }
0683: if (messageBody instanceof MimeMessageWrapper) {
0684: MimeMessageWrapper message = (MimeMessageWrapper) messageBody;
0685: saveBody = message.isModified();
0686: } else {
0687: saveBody = true;
0688: }
0689:
0690: if (saveBody) {
0691: PreparedStatement updateMessageBody = conn
0692: .prepareStatement(sqlQueries.getSqlString(
0693: "updateMessageBodySQL", true));
0694: try {
0695: MessageInputStream is = new MessageInputStream(
0696: mc, sr, inMemorySizeLimit);
0697: updateMessageBody.setBinaryStream(1, is,
0698: (int) is.getSize());
0699: updateMessageBody.setString(2, mc.getName());
0700: updateMessageBody.setString(3, repositoryName);
0701: updateMessageBody.execute();
0702:
0703: } finally {
0704: theJDBCUtil
0705: .closeJDBCStatement(updateMessageBody);
0706: }
0707: }
0708:
0709: } else {
0710: //Insert the record into the database
0711: PreparedStatement insertMessage = null;
0712: try {
0713: String insertMessageSQL = sqlQueries.getSqlString(
0714: "insertMessageSQL", true);
0715: int number_of_parameters = getNumberOfParameters(insertMessageSQL);
0716: insertMessage = conn
0717: .prepareStatement(insertMessageSQL);
0718: insertMessage.setString(1, mc.getName());
0719: insertMessage.setString(2, repositoryName);
0720: insertMessage.setString(3, mc.getState());
0721: insertMessage.setString(4, mc.getErrorMessage());
0722: if (mc.getSender() == null) {
0723: insertMessage
0724: .setNull(5, java.sql.Types.VARCHAR);
0725: } else {
0726: insertMessage.setString(5, mc.getSender()
0727: .toString());
0728: }
0729: StringBuffer recipients = new StringBuffer();
0730: for (Iterator i = mc.getRecipients().iterator(); i
0731: .hasNext();) {
0732: recipients.append(i.next().toString());
0733: if (i.hasNext()) {
0734: recipients.append("\r\n");
0735: }
0736: }
0737: insertMessage.setString(6, recipients.toString());
0738: insertMessage.setString(7, mc.getRemoteHost());
0739: insertMessage.setString(8, mc.getRemoteAddr());
0740: insertMessage.setTimestamp(9,
0741: new java.sql.Timestamp(mc.getLastUpdated()
0742: .getTime()));
0743:
0744: MessageInputStream is = new MessageInputStream(mc,
0745: sr, inMemorySizeLimit);
0746:
0747: insertMessage.setBinaryStream(10, is, (int) is
0748: .getSize());
0749:
0750: //Store attributes
0751: if (number_of_parameters > 10) {
0752: ByteArrayOutputStream baos = new ByteArrayOutputStream();
0753: ObjectOutputStream oos = new ObjectOutputStream(
0754: baos);
0755: try {
0756: if (mc instanceof MailImpl) {
0757: oos.writeObject(((MailImpl) mc)
0758: .getAttributesRaw());
0759: } else {
0760: HashMap temp = new HashMap();
0761: for (Iterator i = mc
0762: .getAttributeNames(); i
0763: .hasNext();) {
0764: String hashKey = (String) i.next();
0765: temp.put(hashKey, mc
0766: .getAttribute(hashKey));
0767: }
0768: oos.writeObject(temp);
0769: }
0770: oos.flush();
0771: ByteArrayInputStream attrInputStream = new ByteArrayInputStream(
0772: baos.toByteArray());
0773: insertMessage.setBinaryStream(11,
0774: attrInputStream, baos.size());
0775: } finally {
0776: try {
0777: if (oos != null) {
0778: oos.close();
0779: }
0780: } catch (IOException ioe) {
0781: getLogger()
0782: .debug(
0783: "JDBCMailRepository: Unexpected exception while closing output stream.",
0784: ioe);
0785: }
0786: }
0787: }
0788:
0789: insertMessage.execute();
0790: } finally {
0791: theJDBCUtil.closeJDBCStatement(insertMessage);
0792: }
0793: }
0794:
0795: conn.commit();
0796: conn.setAutoCommit(true);
0797:
0798: } catch (Exception e) {
0799: getLogger().error(
0800: "Exception caught while storing mail Container", e);
0801: throw new MessagingException(
0802: "Exception caught while storing mail Container: ",
0803: e);
0804: } finally {
0805: theJDBCUtil.closeJDBCConnection(conn);
0806: if (!wasLocked) {
0807: // If it wasn't locked, we need to unlock now
0808: unlock(key);
0809: synchronized (this ) {
0810: notify();
0811: }
0812: }
0813: }
0814: }
0815:
0816: /**
0817: * Retrieves a message given a key. At the moment, keys can be obtained
0818: * from list()
0819: *
0820: * @param key the key of the message to retrieve
0821: * @return the mail corresponding to this key, null if none exists
0822: */
0823: public Mail retrieve(String key) throws MessagingException {
0824: if (DEEP_DEBUG) {
0825: System.err.println("retrieving " + key);
0826: }
0827: Connection conn = null;
0828: PreparedStatement retrieveMessage = null;
0829: ResultSet rsMessage = null;
0830: try {
0831: conn = datasource.getConnection();
0832: if (DEEP_DEBUG) {
0833: System.err.println("got a conn " + key);
0834: }
0835:
0836: retrieveMessage = conn.prepareStatement(sqlQueries
0837: .getSqlString("retrieveMessageSQL", true));
0838: retrieveMessage.setString(1, key);
0839: retrieveMessage.setString(2, repositoryName);
0840: rsMessage = retrieveMessage.executeQuery();
0841: if (DEEP_DEBUG) {
0842: System.err.println("ran the query " + key);
0843: }
0844: if (!rsMessage.next()) {
0845: if (getLogger().isDebugEnabled()) {
0846: StringBuffer debugBuffer = new StringBuffer(64)
0847: .append("Did not find a record ").append(
0848: key).append(" in ").append(
0849: repositoryName);
0850: getLogger().debug(debugBuffer.toString());
0851: }
0852: return null;
0853: }
0854: //Determine whether attributes are used and retrieve them
0855: PreparedStatement retrieveMessageAttr = null;
0856: HashMap attributes = null;
0857: if (jdbcMailAttributesReady) {
0858: String retrieveMessageAttrSql = sqlQueries
0859: .getSqlString("retrieveMessageAttributesSQL",
0860: false);
0861: ResultSet rsMessageAttr = null;
0862: try {
0863: retrieveMessageAttr = conn
0864: .prepareStatement(retrieveMessageAttrSql);
0865:
0866: retrieveMessageAttr.setString(1, key);
0867: retrieveMessageAttr.setString(2, repositoryName);
0868: rsMessageAttr = retrieveMessageAttr.executeQuery();
0869:
0870: if (rsMessageAttr.next()) {
0871: try {
0872: byte[] serialized_attr = null;
0873: String getAttributesOption = sqlQueries
0874: .getDbOption("getAttributes");
0875: if (getAttributesOption != null
0876: && (getAttributesOption
0877: .equalsIgnoreCase("useBlob") || getAttributesOption
0878: .equalsIgnoreCase("useBinaryStream"))) {
0879: Blob b = rsMessageAttr.getBlob(1);
0880: serialized_attr = b.getBytes(1, (int) b
0881: .length());
0882: } else {
0883: serialized_attr = rsMessageAttr
0884: .getBytes(1);
0885: }
0886: // this check is for better backwards compatibility
0887: if (serialized_attr != null) {
0888: ByteArrayInputStream bais = new ByteArrayInputStream(
0889: serialized_attr);
0890: ObjectInputStream ois = new ObjectInputStream(
0891: bais);
0892: attributes = (HashMap) ois.readObject();
0893: ois.close();
0894: }
0895: } catch (IOException ioe) {
0896: if (getLogger().isDebugEnabled()) {
0897: StringBuffer debugBuffer = new StringBuffer(
0898: 64)
0899: .append(
0900: "Exception reading attributes ")
0901: .append(key).append(" in ")
0902: .append(repositoryName);
0903: getLogger().debug(
0904: debugBuffer.toString(), ioe);
0905: }
0906: }
0907: } else {
0908: if (getLogger().isDebugEnabled()) {
0909: StringBuffer debugBuffer = new StringBuffer(
0910: 64)
0911: .append(
0912: "Did not find a record (attributes) ")
0913: .append(key).append(" in ").append(
0914: repositoryName);
0915: getLogger().debug(debugBuffer.toString());
0916: }
0917: }
0918: } catch (SQLException sqle) {
0919: StringBuffer errorBuffer = new StringBuffer(256)
0920: .append("Error retrieving message").append(
0921: sqle.getMessage()).append(
0922: sqle.getErrorCode()).append(
0923: sqle.getSQLState()).append(
0924: sqle.getNextException());
0925: getLogger().error(errorBuffer.toString());
0926: } finally {
0927: theJDBCUtil.closeJDBCResultSet(rsMessageAttr);
0928: theJDBCUtil.closeJDBCStatement(retrieveMessageAttr);
0929: }
0930: }
0931:
0932: MailImpl mc = new MailImpl();
0933: mc.setAttributesRaw(attributes);
0934: mc.setName(key);
0935: mc.setState(rsMessage.getString(1));
0936: mc.setErrorMessage(rsMessage.getString(2));
0937: String sender = rsMessage.getString(3);
0938: if (sender == null) {
0939: mc.setSender(null);
0940: } else {
0941: mc.setSender(new MailAddress(sender));
0942: }
0943: StringTokenizer st = new StringTokenizer(rsMessage
0944: .getString(4), "\r\n", false);
0945: Set recipients = new HashSet();
0946: while (st.hasMoreTokens()) {
0947: recipients.add(new MailAddress(st.nextToken()));
0948: }
0949: mc.setRecipients(recipients);
0950: mc.setRemoteHost(rsMessage.getString(5));
0951: mc.setRemoteAddr(rsMessage.getString(6));
0952: mc.setLastUpdated(rsMessage.getTimestamp(7));
0953:
0954: MimeMessageJDBCSource source = new MimeMessageJDBCSource(
0955: this , key, sr);
0956: MimeMessageCopyOnWriteProxy message = new MimeMessageCopyOnWriteProxy(
0957: source);
0958: mc.setMessage(message);
0959: return mc;
0960: } catch (SQLException sqle) {
0961: StringBuffer errorBuffer = new StringBuffer(256).append(
0962: "Error retrieving message").append(
0963: sqle.getMessage()).append(sqle.getErrorCode())
0964: .append(sqle.getSQLState()).append(
0965: sqle.getNextException());
0966: getLogger().error(errorBuffer.toString());
0967: throw new MessagingException(
0968: "Exception while retrieving mail: "
0969: + sqle.getMessage());
0970: } catch (Exception me) {
0971: throw new MessagingException(
0972: "Exception while retrieving mail: "
0973: + me.getMessage());
0974: } finally {
0975: theJDBCUtil.closeJDBCResultSet(rsMessage);
0976: theJDBCUtil.closeJDBCStatement(retrieveMessage);
0977: theJDBCUtil.closeJDBCConnection(conn);
0978: }
0979: }
0980:
0981: /**
0982: * Removes a specified message
0983: *
0984: * @param mail the message to be removed from the repository
0985: */
0986: public void remove(Mail mail) throws MessagingException {
0987: remove(mail.getName());
0988: }
0989:
0990: /**
0991: * Removes a Collection of mails from the repository
0992: * @param mails The Collection of <code>MailImpl</code>'s to delete
0993: * @throws MessagingException
0994: * @since 2.2.0
0995: */
0996: public void remove(Collection mails) throws MessagingException {
0997: Iterator delList = mails.iterator();
0998: while (delList.hasNext()) {
0999: remove((Mail) delList.next());
1000: }
1001: }
1002:
1003: /**
1004: * Removes a message identified by a key.
1005: *
1006: * @param key the key of the message to be removed from the repository
1007: */
1008: public void remove(String key) throws MessagingException {
1009: //System.err.println("removing " + key);
1010: if (lock(key)) {
1011: Connection conn = null;
1012: PreparedStatement removeMessage = null;
1013: try {
1014: conn = datasource.getConnection();
1015: removeMessage = conn.prepareStatement(sqlQueries
1016: .getSqlString("removeMessageSQL", true));
1017: removeMessage.setString(1, key);
1018: removeMessage.setString(2, repositoryName);
1019: removeMessage.execute();
1020:
1021: if (sr != null) {
1022: sr.remove(key);
1023: }
1024: } catch (Exception me) {
1025: throw new MessagingException(
1026: "Exception while removing mail: "
1027: + me.getMessage());
1028: } finally {
1029: theJDBCUtil.closeJDBCStatement(removeMessage);
1030: theJDBCUtil.closeJDBCConnection(conn);
1031: unlock(key);
1032: }
1033: }
1034: }
1035:
1036: /**
1037: * Gets a list of message keys stored in this repository.
1038: *
1039: * @return an Iterator of the message keys
1040: */
1041: public Iterator list() throws MessagingException {
1042: //System.err.println("listing messages");
1043: Connection conn = null;
1044: PreparedStatement listMessages = null;
1045: ResultSet rsListMessages = null;
1046: try {
1047: conn = datasource.getConnection();
1048: listMessages = conn.prepareStatement(sqlQueries
1049: .getSqlString("listMessagesSQL", true));
1050: listMessages.setString(1, repositoryName);
1051: rsListMessages = listMessages.executeQuery();
1052:
1053: List messageList = new ArrayList();
1054: while (rsListMessages.next()
1055: && !Thread.currentThread().isInterrupted()) {
1056: messageList.add(rsListMessages.getString(1));
1057: }
1058: return messageList.iterator();
1059: } catch (Exception me) {
1060: throw new MessagingException(
1061: "Exception while listing mail: " + me.getMessage());
1062: } finally {
1063: theJDBCUtil.closeJDBCResultSet(rsListMessages);
1064: theJDBCUtil.closeJDBCStatement(listMessages);
1065: theJDBCUtil.closeJDBCConnection(conn);
1066: }
1067: }
1068:
1069: /**
1070: * Gets the SQL connection to be used by this JDBCMailRepository
1071: *
1072: * @return the connection
1073: * @throws SQLException if there is an issue with getting the connection
1074: */
1075: protected Connection getConnection() throws SQLException {
1076: return datasource.getConnection();
1077: }
1078:
1079: /**
1080: * @see java.lang.Object#equals(Object)
1081: */
1082: public boolean equals(Object obj) {
1083: if (!(obj instanceof JDBCMailRepository)) {
1084: return false;
1085: }
1086: // TODO: Figure out whether other instance variables should be part of
1087: // the equals equation
1088: JDBCMailRepository repository = (JDBCMailRepository) obj;
1089: return ((repository.tableName == tableName) || ((repository.tableName != null) && repository.tableName
1090: .equals(tableName)))
1091: && ((repository.repositoryName == repositoryName) || ((repository.repositoryName != null) && repository.repositoryName
1092: .equals(repositoryName)));
1093: }
1094:
1095: /**
1096: * Provide a hash code that is consistent with equals for this class
1097: *
1098: * @return the hash code
1099: */
1100: public int hashCode() {
1101: int result = 17;
1102: if (tableName != null) {
1103: result = 37 * tableName.hashCode();
1104: }
1105: if (repositoryName != null) {
1106: result = 37 * repositoryName.hashCode();
1107: }
1108: return result;
1109: }
1110:
1111: /**
1112: * This method calculates number of parameters in a prepared statement SQL String.
1113: * It does so by counting the number of '?' in the string
1114: * @param sqlstring to return parameter count for
1115: * @return number of parameters
1116: **/
1117: private int getNumberOfParameters(String sqlstring) {
1118: //it is alas a java 1.4 feature to be able to call
1119: //getParameterMetaData which could provide us with the parameterCount
1120: char[] chars = sqlstring.toCharArray();
1121: int count = 0;
1122: for (int i = 0; i < chars.length; i++) {
1123: count += chars[i] == '?' ? 1 : 0;
1124: }
1125: return count;
1126: }
1127: }
|