001: /* $Id: WriterPoolMember.java 5032 2007-04-02 22:02:14Z gojomo $
002: *
003: * Created on July 21st, 2006
004: *
005: * Copyright (C) 2006 Internet Archive.
006: *
007: * This file is part of the Heritrix web crawler (crawler.archive.org).
008: *
009: * Heritrix is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU Lesser Public License as published by
011: * the Free Software Foundation; either version 2.1 of the License, or
012: * any later version.
013: *
014: * Heritrix is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
017: * GNU Lesser Public License for more details.
018: *
019: * You should have received a copy of the GNU Lesser Public License
020: * along with Heritrix; if not, write to the Free Software
021: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
022: */
023: package org.archive.io;
024:
025: import it.unimi.dsi.fastutil.io.FastBufferedOutputStream;
026:
027: import java.io.File;
028: import java.io.FileOutputStream;
029: import java.io.IOException;
030: import java.io.InputStream;
031: import java.io.OutputStream;
032: import java.text.DecimalFormat;
033: import java.text.NumberFormat;
034: import java.util.Iterator;
035: import java.util.List;
036: import java.util.concurrent.atomic.AtomicInteger;
037: import java.util.logging.Logger;
038: import java.util.zip.GZIPOutputStream;
039:
040: import org.archive.util.ArchiveUtils;
041: import org.archive.util.IoUtils;
042: import org.archive.util.TimestampSerialno;
043:
044: /**
045: * Member of {@link WriterPool}.
046: * Implements rotating off files, file naming with some guarantee of
047: * uniqueness, and position in file. Subclass to pick up functionality for a
048: * particular Writer type.
049: * @author stack
050: * @version $Date: 2007-04-02 22:02:14 +0000 (Mon, 02 Apr 2007) $ $Revision: 5032 $
051: */
052: public abstract class WriterPoolMember implements ArchiveFileConstants {
053: private final Logger logger = Logger.getLogger(this .getClass()
054: .getName());
055:
056: public static final String UTF8 = "UTF-8";
057:
058: /**
059: * Default file prefix.
060: *
061: * Stands for Internet Archive Heritrix.
062: */
063: public static final String DEFAULT_PREFIX = "IAH";
064:
065: /**
066: * Value to interpolate with actual hostname.
067: */
068: public static final String HOSTNAME_VARIABLE = "${HOSTNAME}";
069:
070: /**
071: * Default for file suffix.
072: */
073: public static final String DEFAULT_SUFFIX = HOSTNAME_VARIABLE;
074:
075: /**
076: * Reference to file we're currently writing.
077: */
078: private File f = null;
079:
080: /**
081: * Output stream for file.
082: */
083: private OutputStream out = null;
084:
085: /**
086: * File output stream.
087: * This is needed so can get at channel to find current position in file.
088: */
089: private FileOutputStream fos;
090:
091: private final boolean compressed;
092: private List<File> writeDirs = null;
093: private String prefix = DEFAULT_PREFIX;
094: private String suffix = DEFAULT_SUFFIX;
095: private final long maxSize;
096: private final String extension;
097:
098: /**
099: * Creation date for the current file.
100: * Set by {@link #createFile()}.
101: */
102: private String createTimestamp = "UNSET!!!";
103:
104: /**
105: * A running sequence used making unique file names.
106: */
107: final private AtomicInteger serialNo;
108:
109: /**
110: * Directories round-robin index.
111: */
112: private static int roundRobinIndex = 0;
113:
114: /**
115: * NumberFormat instance for formatting serial number.
116: *
117: * Pads serial number with zeros.
118: */
119: private static NumberFormat serialNoFormatter = new DecimalFormat(
120: "00000");
121:
122: /**
123: * Constructor.
124: * Takes a stream. Use with caution. There is no upperbound check on size.
125: * Will just keep writing.
126: *
127: * @param serialNo used to create unique filename sequences
128: * @param out Where to write.
129: * @param file File the <code>out</code> is connected to.
130: * @param cmprs Compress the content written.
131: * @param a14DigitDate If null, we'll write current time.
132: * @throws IOException
133: */
134: protected WriterPoolMember(AtomicInteger serialNo,
135: final OutputStream out, final File file,
136: final boolean cmprs, String a14DigitDate)
137: throws IOException {
138: this (serialNo, null, null, cmprs, -1, null);
139: this .out = out;
140: this .f = file;
141: }
142:
143: /**
144: * Constructor.
145: *
146: * @param serialNo used to create unique filename sequences
147: * @param dirs Where to drop files.
148: * @param prefix File prefix to use.
149: * @param cmprs Compress the records written.
150: * @param maxSize Maximum size for ARC files written.
151: * @param extension Extension to give file.
152: */
153: public WriterPoolMember(AtomicInteger serialNo,
154: final List<File> dirs, final String prefix,
155: final boolean cmprs, final long maxSize,
156: final String extension) {
157: this (serialNo, dirs, prefix, "", cmprs, maxSize, extension);
158: }
159:
160: /**
161: * Constructor.
162: *
163: * @param serialNo used to create unique filename sequences
164: * @param dirs Where to drop files.
165: * @param prefix File prefix to use.
166: * @param cmprs Compress the records written.
167: * @param maxSize Maximum size for ARC files written.
168: * @param suffix File tail to use. If null, unused.
169: * @param extension Extension to give file.
170: */
171: public WriterPoolMember(AtomicInteger serialNo,
172: final List<File> dirs, final String prefix,
173: final String suffix, final boolean cmprs,
174: final long maxSize, final String extension) {
175: this .suffix = suffix;
176: this .prefix = prefix;
177: this .maxSize = maxSize;
178: this .writeDirs = dirs;
179: this .compressed = cmprs;
180: this .extension = extension;
181: this .serialNo = serialNo;
182: }
183:
184: /**
185: * Call this method just before/after any significant write.
186: *
187: * Call at the end of the writing of a record or just before we start
188: * writing a new record. Will close current file and open a new file
189: * if file size has passed out maxSize.
190: *
191: * <p>Creates and opens a file if none already open. One use of this method
192: * then is after construction, call this method to add the metadata, then
193: * call {@link #getPosition()} to find offset of first record.
194: *
195: * @exception IOException
196: */
197: public void checkSize() throws IOException {
198: if (this .out == null
199: || (this .maxSize != -1 && (this .f.length() > this .maxSize))) {
200: createFile();
201: }
202: }
203:
204: /**
205: * Create a new file.
206: * Rotates off the current Writer and creates a new in its place
207: * to take subsequent writes. Usually called from {@link #checkSize()}.
208: * @return Name of file created.
209: * @throws IOException
210: */
211: protected String createFile() throws IOException {
212: TimestampSerialno tsn = getTimestampSerialNo();
213: String name = this .prefix
214: + '-'
215: + getUniqueBasename(tsn)
216: + ((this .suffix == null || this .suffix.length() <= 0) ? ""
217: : "-" + this .suffix)
218: + '.'
219: + this .extension
220: + ((this .compressed) ? '.' + COMPRESSED_FILE_EXTENSION
221: : "") + OCCUPIED_SUFFIX;
222: this .createTimestamp = tsn.getTimestamp();
223: File dir = getNextDirectory(this .writeDirs);
224: return createFile(new File(dir, name));
225: }
226:
227: protected String createFile(final File file) throws IOException {
228: close();
229: this .f = file;
230: this .fos = new FileOutputStream(this .f);
231: this .out = new FastBufferedOutputStream(this .fos);
232: logger.info("Opened " + this .f.getAbsolutePath());
233: return this .f.getName();
234: }
235:
236: /**
237: * @param dirs List of File objects that point at directories.
238: * @return Find next directory to write an arc too. If more
239: * than one, it tries to round-robin through each in turn.
240: * @throws IOException
241: */
242: protected File getNextDirectory(List<File> dirs) throws IOException {
243: if (WriterPoolMember.roundRobinIndex >= dirs.size()) {
244: WriterPoolMember.roundRobinIndex = 0;
245: }
246: File d = null;
247: try {
248: d = checkWriteable((File) dirs
249: .get(WriterPoolMember.roundRobinIndex));
250: } catch (IndexOutOfBoundsException e) {
251: // Dirs list might be altered underneath us.
252: // If so, we get this exception -- just keep on going.
253: }
254: if (d == null && dirs.size() > 1) {
255: for (Iterator i = dirs.iterator(); d == null && i.hasNext();) {
256: d = checkWriteable((File) i.next());
257: }
258: } else {
259: WriterPoolMember.roundRobinIndex++;
260: }
261: if (d == null) {
262: throw new IOException("Directories unusable.");
263: }
264: return d;
265: }
266:
267: protected File checkWriteable(File d) {
268: if (d == null) {
269: return d;
270: }
271:
272: try {
273: IoUtils.ensureWriteableDirectory(d);
274: } catch (IOException e) {
275: logger.warning("Directory " + d.getPath() + " is not"
276: + " writeable or cannot be created: "
277: + e.getMessage());
278: d = null;
279: }
280: return d;
281: }
282:
283: protected synchronized TimestampSerialno getTimestampSerialNo() {
284: return getTimestampSerialNo(null);
285: }
286:
287: /**
288: * Do static synchronization around getting of counter and timestamp so
289: * no chance of a thread getting in between the getting of timestamp and
290: * allocation of serial number throwing the two out of alignment.
291: *
292: * @param timestamp If non-null, use passed timestamp (must be 14 digit
293: * ARC format), else if null, timestamp with now.
294: * @return Instance of data structure that has timestamp and serial no.
295: */
296: protected synchronized TimestampSerialno getTimestampSerialNo(
297: final String timestamp) {
298: return new TimestampSerialno((timestamp != null) ? timestamp
299: : ArchiveUtils.get14DigitDate(), serialNo
300: .getAndIncrement());
301: }
302:
303: /**
304: * Return a unique basename.
305: *
306: * Name is timestamp + an every increasing sequence number.
307: *
308: * @param tsn Structure with timestamp and serial number.
309: *
310: * @return Unique basename.
311: */
312: private String getUniqueBasename(TimestampSerialno tsn) {
313: return tsn.getTimestamp()
314: + "-"
315: + WriterPoolMember.serialNoFormatter.format(tsn
316: .getSerialNumber());
317: }
318:
319: /**
320: * Get the file name
321: *
322: * @return the filename, as if uncompressed
323: */
324: protected String getBaseFilename() {
325: String name = this .f.getName();
326: if (this .compressed
327: && name.endsWith(DOT_COMPRESSED_FILE_EXTENSION)) {
328: return name.substring(0, name.length() - 3);
329: } else if (this .compressed
330: && name.endsWith(DOT_COMPRESSED_FILE_EXTENSION
331: + OCCUPIED_SUFFIX)) {
332: return name.substring(0, name.length()
333: - (3 + OCCUPIED_SUFFIX.length()));
334: } else {
335: return name;
336: }
337: }
338:
339: /**
340: * Get this file.
341: *
342: * Used by junit test to test for creation and when {@link WriterPool} wants
343: * to invalidate a file.
344: *
345: * @return The current file.
346: */
347: public File getFile() {
348: return this .f;
349: }
350:
351: /**
352: * Post write tasks.
353: *
354: * Has side effects. Will open new file if we're at the upperbound.
355: * If we're writing compressed files, it will wrap output stream with a
356: * GZIP writer with side effect that GZIP header is written out on the
357: * stream.
358: *
359: * @exception IOException
360: */
361: protected void preWriteRecordTasks() throws IOException {
362: checkSize();
363: if (this .compressed) {
364: // Wrap stream in GZIP Writer.
365: // The below construction immediately writes the GZIP 'default'
366: // header out on the underlying stream.
367: this .out = new CompressedStream(this .out);
368: }
369: }
370:
371: /**
372: * Post file write tasks.
373: * If compressed, finishes up compression and flushes stream so any
374: * subsequent checks get good reading.
375: *
376: * @exception IOException
377: */
378: protected void postWriteRecordTasks() throws IOException {
379: if (this .compressed) {
380: CompressedStream o = (CompressedStream) this .out;
381: o.finish();
382: o.flush();
383: this .out = o.getWrappedStream();
384: }
385: }
386:
387: /**
388: * Postion in current physical file.
389: * Used making accounting of bytes written.
390: * @return Position in underlying file. Call before or after writing
391: * records *only* to be safe.
392: * @throws IOException
393: */
394: public long getPosition() throws IOException {
395: long position = 0;
396: if (this .out != null) {
397: this .out.flush();
398: }
399: if (this .fos != null) {
400: // Call flush on underlying file though probably not needed assuming
401: // above this.out.flush called through to this.fos.
402: this .fos.flush();
403: position = this .fos.getChannel().position();
404: }
405: return position;
406: }
407:
408: public boolean isCompressed() {
409: return compressed;
410: }
411:
412: protected void write(final byte[] b) throws IOException {
413: this .out.write(b);
414: }
415:
416: protected void flush() throws IOException {
417: this .out.flush();
418: }
419:
420: protected void write(byte[] b, int off, int len) throws IOException {
421: this .out.write(b, off, len);
422: }
423:
424: protected void write(int b) throws IOException {
425: this .out.write(b);
426: }
427:
428: protected void readFullyFrom(final InputStream is,
429: final long recordLength, final byte[] b) throws IOException {
430: int read = b.length;
431: int total = 0;
432: while ((read = is.read(b)) != -1 && total < recordLength) {
433: total += read;
434: write(b, 0, read);
435: }
436: if (total != recordLength) {
437: throw new IOException("Read " + total + " but expected "
438: + recordLength);
439: }
440: }
441:
442: protected void readToLimitFrom(final InputStream is,
443: final long limit, final byte[] b) throws IOException {
444: int read = b.length;
445: long total = 0;
446: while ((read = is.read(b, 0, (int) Math.min(b.length,
447: (limit - total)))) != -1
448: && total < limit) {
449: total += read;
450: write(b, 0, read);
451: }
452: if (total != limit) {
453: throw new IOException("Read " + total + " but expected "
454: + limit);
455: }
456: }
457:
458: public void close() throws IOException {
459: if (this .out == null) {
460: return;
461: }
462: this .out.close();
463: this .out = null;
464: this .fos = null;
465: if (this .f != null && this .f.exists()) {
466: String path = this .f.getAbsolutePath();
467: if (path.endsWith(OCCUPIED_SUFFIX)) {
468: File f = new File(path.substring(0, path.length()
469: - OCCUPIED_SUFFIX.length()));
470: if (!this .f.renameTo(f)) {
471: logger.warning("Failed rename of " + path);
472: }
473: this .f = f;
474: }
475:
476: logger.info("Closed " + this .f.getAbsolutePath()
477: + ", size " + this .f.length());
478: }
479: }
480:
481: protected OutputStream getOutputStream() {
482: return this .out;
483: }
484:
485: protected String getCreateTimestamp() {
486: return createTimestamp;
487: }
488:
489: /**
490: * An override so we get access to underlying output stream.
491: * @author stack
492: */
493: private class CompressedStream extends GZIPOutputStream {
494: public CompressedStream(OutputStream out) throws IOException {
495: super (out);
496: }
497:
498: /**
499: * @return Reference to stream being compressed.
500: */
501: OutputStream getWrappedStream() {
502: return this.out;
503: }
504: }
505: }
|