001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.servicemix.mail;
018:
019: import java.util.Properties;
020:
021: import javax.jbi.JBIException;
022: import javax.jbi.messaging.ExchangeStatus;
023: import javax.jbi.messaging.InOnly;
024: import javax.jbi.messaging.MessageExchange;
025: import javax.jbi.messaging.NormalizedMessage;
026: import javax.mail.Flags;
027: import javax.mail.Folder;
028: import javax.mail.Session;
029: import javax.mail.Store;
030: import javax.mail.internet.MimeMessage;
031: import javax.mail.internet.ParseException;
032:
033: import org.apache.commons.logging.Log;
034: import org.apache.commons.logging.LogFactory;
035:
036: import org.apache.servicemix.common.endpoints.PollingEndpoint;
037:
038: import org.apache.servicemix.mail.marshaler.AbstractMailMarshaler;
039: import org.apache.servicemix.mail.marshaler.DefaultMailMarshaler;
040: import org.apache.servicemix.mail.utils.MailConnectionConfiguration;
041: import org.apache.servicemix.mail.utils.MailUtils;
042:
043: /**
044: * This is the polling endpoint for the mail component.
045: *
046: * @org.apache.xbean.XBean element="poller"
047: * @author lhein
048: */
049: public class MailPollerEndpoint extends PollingEndpoint implements
050: MailEndpointType {
051: private static final transient Log LOG = LogFactory
052: .getLog(MailPollerEndpoint.class);
053:
054: private AbstractMailMarshaler marshaler = new DefaultMailMarshaler();
055: private String customTrustManagers;
056: private MailConnectionConfiguration config;
057:
058: private String connection;
059:
060: private int maxFetchSize = 5;
061:
062: private boolean processOnlyUnseenMessages;
063: private boolean deleteProcessedMessages;
064: private boolean debugMode;
065:
066: /**
067: * default constructor
068: */
069: public MailPollerEndpoint() {
070: this .processOnlyUnseenMessages = true;
071: this .deleteProcessedMessages = false;
072: this .debugMode = false;
073: }
074:
075: /*
076: * (non-Javadoc)
077: *
078: * @see org.apache.servicemix.common.endpoints.ConsumerEndpoint#getLocationURI()
079: */
080: @Override
081: public String getLocationURI() {
082: // return a URI that unique identify this endpoint
083: return getService() + "#" + getEndpoint();
084: }
085:
086: /*
087: * (non-Javadoc)
088: *
089: * @see org.apache.servicemix.common.ExchangeProcessor#process(javax.jbi.messaging.MessageExchange)
090: */
091: public void process(MessageExchange arg0) throws Exception {
092: // Do nothing. In our case, this method should never be called
093: // as we only send synchronous InOnly exchange
094: }
095:
096: /*
097: * (non-Javadoc)
098: *
099: * @see org.apache.servicemix.components.util.PollingComponentSupport#poll()
100: */
101: public void poll() throws Exception {
102: LOG.debug("Polling mailfolder " + config.getFolderName()
103: + " at host " + config.getHost() + "...");
104:
105: Store store = null;
106: Folder folder = null;
107: Session session = null;
108: try {
109: Properties props = MailUtils.getPropertiesForProtocol(
110: this .config, this .customTrustManagers);
111: props.put("mail.debug", isDebugMode() ? "true" : "false");
112:
113: // Get session
114: session = Session.getInstance(props, config
115: .getAuthenticator());
116:
117: // debug the session
118: session.setDebug(this .debugMode);
119:
120: store = session.getStore(config.getProtocol());
121: store.connect(config.getHost(), config.getUsername(),
122: config.getPassword());
123: folder = store.getFolder(config.getFolderName());
124: if (folder == null || !folder.exists()) {
125: throw new Exception("Folder not found or invalid: "
126: + config.getFolderName());
127: }
128: folder.open(Folder.READ_WRITE);
129:
130: int msgCount = 0;
131: // check for max fetch size
132: if (this .maxFetchSize == -1) {
133: // -1 means no restrictions at all - so poll all messages
134: msgCount = folder.getMessageCount();
135: } else {
136: // poll only the set max fetch size
137: msgCount = Math.min(this .maxFetchSize, folder
138: .getMessageCount());
139: }
140:
141: for (int i = 1; i <= msgCount; i++) {
142: // get the message
143: MimeMessage mailMsg = (MimeMessage) folder
144: .getMessage(i);
145:
146: // check if the message may be processed
147: if (isProcessOnlyUnseenMessages()
148: && mailMsg.isSet(Flags.Flag.SEEN)) {
149: // this message should not be processed because
150: // the configuration says to process only unseen messages
151: LOG.debug("Skipped seen mail: "
152: + mailMsg.getSubject());
153: continue;
154: }
155:
156: // create a inOnly exchange
157: InOnly io = getExchangeFactory().createInOnlyExchange();
158:
159: // configure the exchange target
160: configureExchangeTarget(io);
161:
162: // create the in message
163: NormalizedMessage normalizedMessage = io
164: .createMessage();
165:
166: // now let the marshaller convert the mail into a normalized
167: // message to send to jbi bus
168: marshaler.convertMailToJBI(io, normalizedMessage,
169: mailMsg);
170:
171: // then put the in message into the inOnly exchange
172: io.setInMessage(normalizedMessage);
173:
174: // and use sendSync to deliver it
175: sendSync(io);
176:
177: // now check if delivery succeeded or went wrong
178: if (io.getStatus() == ExchangeStatus.ERROR) {
179: Exception e = io.getError();
180: if (e == null) {
181: e = new JBIException("Unexpected error: "
182: + e.getMessage());
183: }
184: throw e;
185: } else {
186: // then mark the mail as processed (only if no errors)
187: if (deleteProcessedMessages) {
188: // processed messages have to be marked as deleted
189: mailMsg.setFlag(Flags.Flag.DELETED, true);
190: } else {
191: // processed messages have to be marked as seen
192: mailMsg.setFlag(Flags.Flag.SEEN, true);
193: }
194: }
195: }
196: } finally {
197: // finally clean up and close the folder and store
198: try {
199: if (folder != null) {
200: folder.close(true);
201: }
202: if (store != null) {
203: store.close();
204: }
205: } catch (Exception ignored) {
206: logger.debug(ignored);
207: }
208: }
209: }
210:
211: /**
212: * @return the deleteProcessedMessages
213: */
214: public boolean isDeleteProcessedMessages() {
215: return this .deleteProcessedMessages;
216: }
217:
218: /**
219: * @param deleteProcessedMessages the deleteProcessedMessages to set
220: */
221: public void setDeleteProcessedMessages(
222: boolean deleteProcessedMessages) {
223: this .deleteProcessedMessages = deleteProcessedMessages;
224: }
225:
226: /**
227: * @return the marshaler
228: */
229: public AbstractMailMarshaler getMarshaler() {
230: return this .marshaler;
231: }
232:
233: /**
234: * @param marshaler the marshaler to set
235: */
236: public void setMarshaler(AbstractMailMarshaler marshaler) {
237: this .marshaler = marshaler;
238: }
239:
240: /**
241: * @return the maxFetchSize
242: */
243: public int getMaxFetchSize() {
244: return this .maxFetchSize;
245: }
246:
247: /**
248: * @param maxFetchSize the maxFetchSize to set
249: */
250: public void setMaxFetchSize(int maxFetchSize) {
251: this .maxFetchSize = maxFetchSize;
252: }
253:
254: /**
255: * @return the processOnlyUnseenMessages
256: */
257: public boolean isProcessOnlyUnseenMessages() {
258: return this .processOnlyUnseenMessages;
259: }
260:
261: /**
262: * @param processOnlyUnseenMessages the processOnlyUnseenMessages to set
263: */
264: public void setProcessOnlyUnseenMessages(
265: boolean processOnlyUnseenMessages) {
266: this .processOnlyUnseenMessages = processOnlyUnseenMessages;
267: }
268:
269: /**
270: * returns the connection uri used for this poller endpoint
271: *
272: * @return Returns the connection.
273: */
274: public String getConnection() {
275: return this .connection;
276: }
277:
278: /**
279: * sets the connection uri
280: *
281: * @param connection The connection to set.
282: */
283: public void setConnection(String connection) {
284: this .connection = connection;
285: try {
286: this .config = MailUtils.configure(this .connection);
287: } catch (ParseException ex) {
288: LOG.error("The configured connection uri is invalid", ex);
289: }
290: }
291:
292: /**
293: * @return the debugMode
294: */
295: public boolean isDebugMode() {
296: return this .debugMode;
297: }
298:
299: /**
300: * @param debugMode the debugMode to set
301: */
302: public void setDebugMode(boolean debugMode) {
303: this .debugMode = debugMode;
304: }
305:
306: /**
307: * @return the customTrustManagers
308: */
309: public String getCustomTrustManagers() {
310: return this .customTrustManagers;
311: }
312:
313: /**
314: * @param customTrustManagers the customTrustManagers to set
315: */
316: public void setCustomTrustManagers(String customTrustManagers) {
317: this.customTrustManagers = customTrustManagers;
318: }
319: }
|