001: /****************************************************************
002: * Licensed to the Apache Software Foundation (ASF) under one *
003: * or more contributor license agreements. See the NOTICE file *
004: * distributed with this work for additional information *
005: * regarding copyright ownership. The ASF licenses this file *
006: * to you under the Apache License, Version 2.0 (the *
007: * "License"); you may not use this file except in compliance *
008: * with the License. You may obtain a copy of the License at *
009: * *
010: * http://www.apache.org/licenses/LICENSE-2.0 *
011: * *
012: * Unless required by applicable law or agreed to in writing, *
013: * software distributed under the License is distributed on an *
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
015: * KIND, either express or implied. See the License for the *
016: * specific language governing permissions and limitations *
017: * under the License. *
018: ****************************************************************/package org.apache.james.nntpserver.repository;
019:
020: import org.apache.avalon.framework.activity.Initializable;
021: import org.apache.avalon.framework.configuration.Configurable;
022: import org.apache.avalon.framework.configuration.Configuration;
023: import org.apache.avalon.framework.configuration.ConfigurationException;
024: import org.apache.avalon.framework.context.Context;
025: import org.apache.avalon.framework.context.ContextException;
026: import org.apache.avalon.framework.context.Contextualizable;
027: import org.apache.avalon.framework.logger.AbstractLogEnabled;
028: import org.apache.avalon.framework.logger.LogEnabled;
029: import org.apache.james.context.AvalonContextUtilities;
030: import org.apache.james.util.Lock;
031: import org.apache.james.util.io.IOUtil;
032:
033: import javax.mail.internet.MimeMessage;
034: import java.io.BufferedReader;
035: import java.io.File;
036: import java.io.FileInputStream;
037: import java.io.FileOutputStream;
038: import java.io.InputStreamReader;
039: import java.util.Properties;
040: import java.util.StringTokenizer;
041:
042: /**
043: * Processes entries and sends to appropriate groups.
044: * Eats up inappropriate entries.
045: *
046: */
047: class NNTPSpooler extends AbstractLogEnabled implements
048: Contextualizable, Configurable, Initializable {
049:
050: /**
051: * The spooler context
052: */
053: private Context context;
054:
055: /**
056: * The array of spooler runnables, each associated with a Worker thread
057: */
058: private SpoolerRunnable[] worker;
059:
060: /**
061: * The directory containing entries to be spooled.
062: */
063: private File spoolPath;
064:
065: /**
066: * The String form of the spool directory.
067: */
068: private String spoolPathString;
069:
070: /**
071: * The time the spooler threads sleep between processing
072: */
073: private int threadIdleTime = 0;
074:
075: /**
076: * @see org.apache.avalon.framework.context.Contextualizable#contextualize(Context)
077: */
078: public void contextualize(final Context context)
079: throws ContextException {
080: this .context = context;
081: }
082:
083: /**
084: * @see org.apache.avalon.framework.configuration.Configurable#configure(Configuration)
085: */
086: public void configure(Configuration configuration)
087: throws ConfigurationException {
088: int threadCount = configuration.getChild("threadCount")
089: .getValueAsInteger(1);
090: threadIdleTime = configuration.getChild("threadIdleTime")
091: .getValueAsInteger(60 * 1000);
092: spoolPathString = configuration.getChild("spoolPath")
093: .getValue();
094: worker = new SpoolerRunnable[threadCount];
095: }
096:
097: /**
098: * @see org.apache.avalon.framework.activity.Initializable#initialize()
099: */
100: public void initialize() throws Exception {
101: //System.out.println(getClass().getName()+": init");
102:
103: try {
104: spoolPath = AvalonContextUtilities.getFile(context,
105: spoolPathString);
106: if (spoolPath.exists() == false) {
107: spoolPath.mkdirs();
108: } else if (!(spoolPath.isDirectory())) {
109: StringBuffer errorBuffer = new StringBuffer(128)
110: .append(
111: "Spool directory is improperly configured. The specified path ")
112: .append(spoolPathString).append(
113: " is not a directory.");
114: throw new ConfigurationException(errorBuffer.toString());
115: }
116: } catch (Exception e) {
117: getLogger().fatalError(e.getMessage(), e);
118: throw e;
119: }
120:
121: for (int i = 0; i < worker.length; i++) {
122: worker[i] = new SpoolerRunnable(threadIdleTime, spoolPath);
123: if (worker[i] instanceof LogEnabled) {
124: ((LogEnabled) worker[i]).enableLogging(getLogger());
125: }
126: }
127:
128: // TODO: Replace this with a standard Avalon thread pool
129: for (int i = 0; i < worker.length; i++) {
130: new Thread(worker[i], "NNTPSpool-" + i).start();
131: }
132: }
133:
134: /**
135: * Sets the repository used by this spooler.
136: *
137: * @param repo the repository to be used
138: */
139: void setRepository(NNTPRepository repo) {
140: for (int i = 0; i < worker.length; i++) {
141: worker[i].setRepository(repo);
142: }
143: }
144:
145: /**
146: * Sets the article id repository used by this spooler.
147: *
148: * @param articleIDRepo the article id repository to be used
149: */
150: void setArticleIDRepository(ArticleIDRepository articleIDRepo) {
151: for (int i = 0; i < worker.length; i++) {
152: worker[i].setArticleIDRepository(articleIDRepo);
153: }
154: }
155:
156: /**
157: * Returns (and creates, if the directory doesn't already exist) the
158: * spool directory
159: *
160: * @return the spool directory
161: */
162: File getSpoolPath() {
163: return spoolPath;
164: }
165:
166: /**
167: * A static inner class that provides the body for the spool
168: * threads.
169: */
170: static class SpoolerRunnable extends AbstractLogEnabled implements
171: Runnable {
172:
173: private static final Lock lock = new Lock();
174:
175: /**
176: * The directory containing entries to be spooled.
177: */
178: private final File spoolPath;
179:
180: /**
181: * The time the spooler thread sleeps between processing
182: */
183: private final int threadIdleTime;
184:
185: /**
186: * The article ID repository used by this spooler thread
187: */
188: private ArticleIDRepository articleIDRepo;
189:
190: /**
191: * The NNTP repository used by this spooler thread
192: */
193: private NNTPRepository repo;
194:
195: SpoolerRunnable(int threadIdleTime, File spoolPath) {
196: this .threadIdleTime = threadIdleTime;
197: this .spoolPath = spoolPath;
198: }
199:
200: /**
201: * Sets the article id repository used by this spooler thread.
202: *
203: * @param articleIDRepo the article id repository to be used
204: */
205: void setArticleIDRepository(ArticleIDRepository articleIDRepo) {
206: this .articleIDRepo = articleIDRepo;
207: }
208:
209: /**
210: * Sets the repository used by this spooler thread.
211: *
212: * @param repo the repository to be used
213: */
214: void setRepository(NNTPRepository repo) {
215: this .repo = repo;
216: }
217:
218: /**
219: * The threads race to grab a lock. if a thread wins it processes the article,
220: * if it loses it tries to lock and process the next article.
221: */
222: public void run() {
223: getLogger().debug(
224: Thread.currentThread().getName()
225: + " is the NNTP spooler thread.");
226: try {
227: while (Thread.currentThread().interrupted() == false) {
228: String[] list = spoolPath.list();
229: if (list.length > 0)
230: getLogger().debug(
231: "Files to process: " + list.length);
232: for (int i = 0; i < list.length; i++) {
233: if (lock.lock(list[i])) {
234: File f = new File(spoolPath, list[i])
235: .getAbsoluteFile();
236: getLogger().debug(
237: "Processing file: "
238: + f.getAbsolutePath());
239: try {
240: process(f);
241: } catch (Throwable ex) {
242: getLogger().debug(
243: "Exception occured while processing file: "
244: + f.getAbsolutePath(),
245: ex);
246: } finally {
247: lock.unlock(list[i]);
248: }
249: }
250: list[i] = null; // release the string entry;
251: }
252: list = null; // release the array;
253: // this is good for other non idle threads
254: try {
255: Thread.currentThread().sleep(threadIdleTime);
256: } catch (InterruptedException ex) {
257: // Ignore and continue
258: }
259: }
260: } finally {
261: Thread.currentThread().interrupted();
262: }
263: }
264:
265: /**
266: * Process a file stored in the spool.
267: *
268: * @param f the spool file being processed
269: */
270: private void process(File spoolFile) throws Exception {
271: StringBuffer logBuffer = new StringBuffer(160).append(
272: "process: ").append(spoolFile.getAbsolutePath())
273: .append(",").append(spoolFile.getCanonicalPath());
274: getLogger().debug(logBuffer.toString());
275: final MimeMessage msg;
276: String articleID;
277: // TODO: Why is this a block?
278: { // Get the message for copying to destination groups.
279: FileInputStream fin = new FileInputStream(spoolFile);
280: try {
281: msg = new MimeMessage(null, fin);
282: } finally {
283: IOUtil.shutdownStream(fin);
284: }
285:
286: String lineCount = null;
287: String[] lineCountHeader = msg.getHeader("Lines");
288: if (lineCountHeader == null
289: || lineCountHeader.length == 0) {
290: BufferedReader rdr = new BufferedReader(
291: new InputStreamReader(msg.getDataHandler()
292: .getInputStream()));
293: int lines = 0;
294: while (rdr.readLine() != null) {
295: lines++;
296: }
297:
298: lineCount = Integer.toString(lines);
299: rdr.close();
300:
301: msg.setHeader("Lines", lineCount);
302: }
303:
304: // ensure no duplicates exist.
305: String[] idheader = msg.getHeader("Message-Id");
306: articleID = ((idheader != null && (idheader.length > 0)) ? idheader[0]
307: : null);
308: if ((articleID != null)
309: && (articleIDRepo.isExists(articleID))) {
310: getLogger().debug(
311: "Message already exists: " + articleID);
312: if (spoolFile.delete() == false)
313: getLogger().error(
314: "Could not delete duplicate message from spool: "
315: + spoolFile.getAbsolutePath());
316: return;
317: }
318: if (articleID == null || lineCount != null) {
319: if (articleID == null) {
320: articleID = articleIDRepo.generateArticleID();
321: msg.setHeader("Message-Id", articleID);
322: }
323: FileOutputStream fout = new FileOutputStream(
324: spoolFile);
325: try {
326: msg.writeTo(fout);
327: } finally {
328: IOUtil.shutdownStream(fout);
329: }
330: }
331: }
332:
333: String[] headers = msg.getHeader("Newsgroups");
334: Properties prop = new Properties();
335: if (headers != null) {
336: for (int i = 0; i < headers.length; i++) {
337: StringTokenizer tokenizer = new StringTokenizer(
338: headers[i], ",");
339: while (tokenizer.hasMoreTokens()) {
340: String groupName = tokenizer.nextToken().trim();
341: getLogger().debug(
342: "Copying message to group: "
343: + groupName);
344: NNTPGroup group = repo.getGroup(groupName);
345: if (group == null) {
346: getLogger().error(
347: "Couldn't add article with article ID "
348: + articleID + " to group "
349: + groupName
350: + " - group not found.");
351: continue;
352: }
353:
354: FileInputStream newsStream = new FileInputStream(
355: spoolFile);
356: try {
357: NNTPArticle article = group
358: .addArticle(newsStream);
359: prop.setProperty(group.getName(), article
360: .getArticleNumber()
361: + "");
362: } finally {
363: IOUtil.shutdownStream(newsStream);
364: }
365: }
366: }
367: }
368: articleIDRepo.addArticle(articleID, prop);
369: boolean delSuccess = spoolFile.delete();
370: if (delSuccess == false) {
371: getLogger().error(
372: "Could not delete file: "
373: + spoolFile.getAbsolutePath());
374: }
375: }
376: } // class SpoolerRunnable
377: }
|