001: /*
002: * Copyright (c) 2000 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.lib.aDisk;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.core.*;
029: import java.io.*;
030:
031: /**
032: * This is an implementation of AFile which uses a pool of threads
033: * which perform blocking I/O (through the java.io.RandomAccessFile
034: * class) on files. This is a portable implementation but is not
035: * intended to be high-performance.
036: *
037: * @author Matt Welsh
038: * @see AFile
039: */
040: class AFileTPImpl extends AFileImpl implements QueueElementIF {
041:
042: private File f;
043: RandomAccessFile raf;
044: private AFile afile;
045: private AFileTPTM tm;
046: private SinkIF compQ;
047: private FiniteQueue eventQ;
048: private boolean readOnly;
049: private boolean closed;
050:
051: /**
052: * Create an AFileTPIMpl with the given AFile, filename, completion
053: * queue, create/readOnly flags, and Thread Manager.
054: */
055: AFileTPImpl(AFile afile, String fname, SinkIF compQ,
056: boolean create, boolean readOnly, AFileTPTM tm)
057: throws IOException {
058: this .afile = afile;
059: this .tm = tm;
060: this .compQ = compQ;
061: this .readOnly = readOnly;
062:
063: eventQ = new FiniteQueue();
064:
065: f = new File(fname);
066: if (!f.exists() && !create) {
067: throw new FileNotFoundException("File not found: " + fname);
068: }
069: if (f.isDirectory()) {
070: throw new FileIsDirectoryException("Is a directory: "
071: + fname);
072: }
073:
074: if (readOnly) {
075: raf = new RandomAccessFile(f, "r");
076: } else {
077: raf = new RandomAccessFile(f, "rw");
078: }
079: closed = false;
080: }
081:
082: /**
083: * Enqueues the given request (which must be an AFileRequest)
084: * to the file.
085: */
086: public void enqueue(QueueElementIF req) throws SinkException {
087: AFileRequest areq = (AFileRequest) req;
088: if (closed) {
089: throw new SinkClosedException("Sink is closed");
090: }
091: if (readOnly && (areq instanceof AFileWriteRequest)) {
092: throw new BadQueueElementException(
093: "Cannot enqueue write request for read-only file",
094: areq);
095: }
096: areq.afile = afile;
097: try {
098: eventQ.enqueue(areq);
099: } catch (SinkException se) {
100: throw new InternalError(
101: "AFileTPImpl.enqueue got SinkException - this should not happen, please contact <mdw@cs.berkeley.edu>");
102: }
103: if (eventQ.size() == 1) {
104: tm.fileReady(this );
105: }
106: }
107:
108: /**
109: * Enqueues the given request (which must be an AFileRequest)
110: * to the file.
111: */
112: public boolean enqueue_lossy(QueueElementIF req) {
113: AFileRequest areq = (AFileRequest) req;
114: if (closed || (readOnly && (areq instanceof AFileWriteRequest))) {
115: return false;
116: }
117: areq.afile = afile;
118: try {
119: eventQ.enqueue(areq);
120: } catch (SinkException se) {
121: throw new InternalError(
122: "AFileTPImpl.enqueue got SinkException - this should not happen, please contact <mdw@cs.berkeley.edu>");
123: }
124: if (eventQ.size() == 1) {
125: tm.fileReady(this );
126: }
127: return true;
128: }
129:
130: /**
131: * Enqueues the given requests (which must be AFileRequests)
132: * to the file.
133: */
134: public void enqueue_many(QueueElementIF[] elements)
135: throws SinkException {
136: if (closed) {
137: throw new SinkClosedException("Sink is closed");
138: }
139: for (int i = 0; i < elements.length; i++) {
140: enqueue(elements[i]);
141: }
142: }
143:
144: /**
145: * Return information on the properties of the file.
146: */
147: AFileStat stat() {
148: AFileStat s = new AFileStat();
149: s.afile = afile;
150: s.isDirectory = f.isDirectory();
151: s.canRead = f.canRead();
152: s.canWrite = f.canWrite();
153: s.length = f.length();
154: return s;
155: }
156:
157: /**
158: * Close the file after all enqueued requests have completed.
159: * Disallows any additional requests to be enqueued on this file.
160: * A SinkClosedEvent will be posted on the file's completion queue
161: * when the close is complete.
162: */
163: public void close() {
164: enqueue_lossy(new AFileCloseRequest(afile, compQ));
165: closed = true;
166: }
167:
168: /**
169: * Causes a SinkFlushedEvent to be posted on the file's completion queue
170: * when all pending requests have completed.
171: */
172: public void flush() {
173: enqueue_lossy(new AFileFlushRequest(afile, compQ));
174: }
175:
176: /**
177: * Return the per-file event queue.
178: */
179: QueueIF getQueue() {
180: return eventQ;
181: }
182:
183: }
|