001: /*
002: * <copyright>
003: *
004: * Copyright 1997-2004 BBNT Solutions, LLC
005: * under sponsorship of the Defense Advanced Research Projects
006: * Agency (DARPA).
007: *
008: * You can redistribute this software and/or modify it under the
009: * terms of the Cougaar Open Source License as published on the
010: * Cougaar Open Source Website (www.cougaar.org).
011: *
012: * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
013: * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
014: * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
015: * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
016: * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
017: * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
018: * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
019: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
020: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
021: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
022: * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
023: *
024: * </copyright>
025: */
026:
027: package org.cougaar.core.persist;
028:
029: import java.io.File;
030: import java.io.FileInputStream;
031: import java.io.FileNotFoundException;
032: import java.io.FileOutputStream;
033: import java.io.IOException;
034: import java.io.InputStream;
035: import java.io.OutputStream;
036: import java.util.ArrayList;
037: import java.util.List;
038:
039: import org.cougaar.util.CircularQueue;
040: import org.cougaar.util.log.Logger;
041:
042: /**
043: * Support for buffered and otherwise queued access to the
044: * file system. Primarily, this serializes access to writing and
045: * renaming and reading files. All file write and rename actions
046: * return immediately without waiting for the actual filesystem
047: * operations to complete. File reads invariably block until prior
048: * writes and renames have concluded.
049: */
050: public class BufferedFileSystem implements Runnable {
051: private static final int BUFSIZE = 100000;
052: private static final int MAXBUFFERS = 100; // 10 Mbytes max
053: private static final int MAXKEPTBUFFERS = 20; // 2 Mbytes max
054:
055: private static List buffers = new ArrayList();
056: private static int totalBuffers = 0;
057:
058: /**
059: * Get a buffer for writing. This is a bit complicated to avoid
060: * excessive allocation activity and excessive consumption of memory
061: * for this purpose. Up to MAXKEPTBUFFERS are allocated and reused
062: * freely. If demand exceeds MAXKEPTBUFFERS, additional buffers are
063: * allocated up to MAXBUFFERS. Demand in excess of MAXBUFFERS blocks.
064: */
065: private static byte[] getBuffer() {
066: synchronized (buffers) {
067: while (totalBuffers >= MAXBUFFERS) {
068: try {
069: buffers.wait();
070: } catch (InterruptedException ie) {
071: }
072: }
073: int len = buffers.size();
074: if (len > 0) {
075: return (byte[]) buffers.remove(len - 1);
076: }
077: totalBuffers++;
078: return new byte[BUFSIZE];
079: }
080: }
081:
082: private static void releaseBuffer(byte[] buf) {
083: synchronized (buffers) {
084: if (buffers.size() < MAXKEPTBUFFERS) {
085: buffers.add(buf);
086: } else {
087: totalBuffers--;
088: }
089: buffers.notifyAll();
090: }
091: }
092:
093: private Logger logger;
094:
095: private Thread thread;
096:
097: private boolean active;
098:
099: private CircularQueue queue = new CircularQueue();
100: private boolean executingJob = false;
101:
102: /**
103: * Wrap a FileOutputStream to provide safe close semantics.
104: * Explicitly sync the file descriptor on close() to insure the file
105: * has been completely written to the disk.
106: */
107: private class BufferedFileOutputStream extends OutputStream {
108: // The real OutputStream
109: private FileOutputStream fileOutputStream;
110: private byte[] buffer;
111: private int nbytes;
112:
113: public BufferedFileOutputStream(FileOutputStream stream) {
114: fileOutputStream = stream;
115: newBuffer();
116: }
117:
118: private void newBuffer() {
119: buffer = getBuffer();
120: nbytes = 0;
121: }
122:
123: private void switchBuffer(final int nbytes) {
124: enqueueJob(new Runnable() {
125: byte[] buf = buffer;
126:
127: public void run() {
128: try {
129: fileOutputStream.write(buf, 0, nbytes);
130: } catch (IOException ioe) {
131: throw new BufferedFileException(ioe);
132: } finally {
133: releaseBuffer(buf);
134: }
135: }
136:
137: public String toString() {
138: return "Write " + nbytes;
139: }
140: });
141: newBuffer();
142: }
143:
144: public void write(int b) throws IOException {
145: buffer[nbytes++] = (byte) b;
146: if (nbytes == BUFSIZE)
147: switchBuffer(nbytes);
148: }
149:
150: public void write(byte[] b, int offset, int nb)
151: throws IOException {
152: while (nb > 0) {
153: int tnb = Math.min(nb, BUFSIZE - nbytes);
154: System.arraycopy(b, offset, buffer, nbytes, tnb);
155: nbytes += tnb;
156: if (nbytes == BUFSIZE)
157: switchBuffer(nbytes);
158: offset += tnb;
159: nb -= tnb;
160: }
161: }
162:
163: public void write(byte[] b) throws IOException {
164: write(b, 0, b.length);
165: }
166:
167: public void flush() throws IOException {
168: if (nbytes > 0)
169: switchBuffer(nbytes);
170: }
171:
172: public void close() throws IOException {
173: flush();
174: enqueueJob(new Runnable() {
175: public void run() {
176: try {
177: fileOutputStream.flush();
178: fileOutputStream.getFD().sync();
179: fileOutputStream.close();
180: } catch (IOException ioe) {
181: throw new BufferedFileException(ioe);
182: }
183: }
184:
185: public String toString() {
186: return "close";
187: }
188: });
189: if (logger.isInfoEnabled())
190: logger.info("Buffered closed");
191: }
192: }
193:
194: private static class BufferedFileException extends RuntimeException {
195: public BufferedFileException(IOException t) {
196: super ("Buffered IOException", t);
197: }
198: }
199:
200: public BufferedFileSystem(Logger ls) {
201: logger = ls;
202: }
203:
204: private void enqueueJob(Runnable job) {
205: synchronized (queue) {
206: queue.add(job);
207: if (thread == null) {
208: active = true;
209: thread = new Thread(this , "BufferedFileSystem");
210: thread.start();
211: }
212: }
213: }
214:
215: public void run() {
216: while (true) {
217: Runnable job;
218: synchronized (queue) {
219: while (queue.size() == 0) {
220: try {
221: queue.wait();
222: } catch (InterruptedException ie) {
223: }
224: if (!active)
225: return;
226: }
227: job = (Runnable) queue.next();
228: executingJob = true;
229: }
230: if (logger.isInfoEnabled())
231: logger.info("Buffered job " + job);
232: try {
233: job.run();
234: } catch (Throwable bfe) {
235: logger.error(bfe.getMessage(), bfe.getCause());
236: }
237: synchronized (queue) {
238: executingJob = false;
239: queue.notifyAll();
240: }
241: }
242: }
243:
244: public void waitForPrevious() {
245: synchronized (queue) {
246: while (queue.size() > 0 || executingJob) {
247: try {
248: queue.wait();
249: } catch (InterruptedException ie) {
250: }
251: }
252: }
253: }
254:
255: public void stop() {
256: synchronized (queue) {
257: active = false;
258: queue.notify();
259: try {
260: thread.join();
261: } catch (InterruptedException ie) {
262: }
263: }
264: }
265:
266: public OutputStream openOutputStream(File file)
267: throws FileNotFoundException {
268: return new BufferedFileOutputStream(new FileOutputStream(file));
269: }
270:
271: public InputStream openInputStream(File file)
272: throws FileNotFoundException {
273: waitForPrevious();
274: return new FileInputStream(file);
275: }
276:
277: public boolean rename(final File from, final File to) {
278: enqueueJob(new Runnable() {
279: public void run() {
280: from.renameTo(to);
281: }
282:
283: public String toString() {
284: return "rename " + from + " to " + to;
285: }
286: });
287: return true;
288: }
289: }
|