001: /*
002: * Copyright (c) 2000 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.lib.aDisk;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.api.internal.*;
029: import seda.sandStorm.core.*;
030: import seda.sandStorm.internal.*;
031: import seda.sandStorm.main.*;
032: import java.io.*;
033: import java.util.*;
034:
035: /**
036: * This is the ThreadManager implementation for AFileTPImpl.
037: * It manages a pool of threads which perform blocking I/O
038: * on disk files; this is a portable implementation and is not
039: * meant to be high performance.
040: *
041: * @author Matt Welsh
042: */
043: class AFileTPTM extends TPSThreadManager implements ThreadManagerIF,
044: ProfilableIF {
045:
046: private static final boolean DEBUG = false;
047:
048: // Global queue for files with pending entries
049: private FiniteQueue fileQ;
050: // Count of outstanding file requests, derived from length of
051: // queue of each file on fileQ
052: private int numOutstandingRequests;
053:
054: // Maximum number of consecutive requests to service per file
055: private static final int MAX_REQUESTS_PER_FILE = 10;
056: // Block time for file queue
057: private static final int QUEUE_BLOCK_TIME = 1000;
058:
059: AFileTPTM(ManagerIF mgr, SystemManagerIF sysmgr) throws Exception {
060: super (mgr, false);
061:
062: if (DEBUG)
063: System.err.println("AFileTPTM: Created");
064:
065: if (config
066: .getBoolean("global.aDisk.threadPool.sizeController.enable")) {
067: sizeController = new ThreadPoolController(
068: mgr,
069: config
070: .getInt("global.aDisk.threadPool.sizeController.delay"),
071: config
072: .getInt("global.aDisk.threadPool.sizeController.threshold"));
073: }
074:
075: fileQ = new FiniteQueue();
076: numOutstandingRequests = 0;
077: sysmgr.addThreadManager("AFileTPTM", this );
078: AFileTPStageWrapper sw = new AFileTPStageWrapper(
079: "AFileTPTM Stage", null, new ConfigData(mgr), this );
080: StageIF theStage = sysmgr.createStage(sw, true);
081:
082: if (mgr.getProfiler() != null) {
083: mgr.getProfiler().add("AFileTPTM outstanding reqs", this );
084: }
085: }
086:
087: /**
088: * Register a stage with this thread manager.
089: */
090: public void register(StageWrapperIF stage) {
091: // Create a single threadPool - only one stage registered with us
092: AFileTPThread at = new AFileTPThread(
093: (AFileTPStageWrapper) stage);
094: SandstormConfig config = mgr.getConfig();
095: ThreadPool tp = new ThreadPool(
096: stage,
097: mgr,
098: at,
099: config.getInt("global.aDisk.threadPool.initialThreads"),
100: config.getInt("global.aDisk.threadPool.minThreads"),
101: config.getInt("global.aDisk.threadPool.maxThreads"),
102: config.getInt("global.threadPool.blockTime"),
103: config
104: .getInt("global.threadPool.sizeController.idleTimeThreshold"));
105: at.registerTP(tp);
106: // Use numOutstandingRequests as metric
107: if (sizeController != null)
108: sizeController.register(stage, tp, this );
109: tp.start();
110: }
111:
112: /**
113: * Indicate that a file has pending events.
114: */
115: public void fileReady(AFileTPImpl impl) {
116: try {
117: fileQueueEntry fqe = new fileQueueEntry(impl);
118: fileQ.enqueue(fqe);
119: synchronized (fileQ) {
120: numOutstandingRequests += fqe.size;
121: }
122: } catch (SinkException se) {
123: throw new InternalError(
124: "AFileTPTM.fileReady() got SinkException -- this should not happen, please contact <mdw@cs.berkeley.edu>");
125: }
126: }
127:
128: // Return the number of outstanding elements, for profiling
129: public int profileSize() {
130: return numOutstandingRequests;
131: }
132:
133: // Used to keep track of number of elements on fileQ
134: class fileQueueEntry implements QueueElementIF {
135: AFileTPImpl impl;
136: int size;
137:
138: fileQueueEntry(AFileTPImpl impl) {
139: this .impl = impl;
140: this .size = ((SourceIF) impl.getQueue()).size();
141: }
142: }
143:
144: /**
145: * Internal class representing a single AFileTPTM-managed thread.
146: */
147: class AFileTPThread extends TPSThreadManager.stageRunnable
148: implements Runnable {
149:
150: AFileTPThread(AFileTPStageWrapper wrapper) {
151: super (wrapper, null);
152: }
153:
154: public void registerTP(ThreadPool tp) {
155: this .tp = tp;
156: }
157:
158: public void run() {
159: int blockTime;
160: long t1, t2;
161:
162: if (DEBUG)
163: System.err.println(name + ": starting");
164:
165: t1 = System.currentTimeMillis();
166:
167: while (true) {
168:
169: try {
170:
171: blockTime = (int) tp.getBlockTime();
172:
173: AFileTPImpl impl;
174: fileQueueEntry fqe = (fileQueueEntry) fileQ
175: .blocking_dequeue(blockTime);
176: if (fqe == null) {
177: t2 = System.currentTimeMillis();
178: if (tp.timeToStop(t2 - t1)) {
179: if (DEBUG)
180: System.err.println(name + ": Exiting");
181: return;
182: }
183: continue;
184: }
185: t1 = System.currentTimeMillis();
186:
187: impl = fqe.impl;
188: synchronized (fileQ) {
189: numOutstandingRequests -= fqe.size;
190: }
191:
192: int n = 0;
193:
194: while (n < MAX_REQUESTS_PER_FILE) {
195: AFileRequest req = (AFileRequest) impl
196: .getQueue().dequeue();
197: if (req == null)
198: break;
199: processRequest(req);
200: n++;
201: }
202: // If events still pending, place back on file queue
203: if (((SourceIF) impl.getQueue()).size() != 0)
204: fileReady(impl);
205:
206: Thread.currentThread().yield();
207:
208: } catch (Exception e) {
209: System.err.println(name + ": got exception " + e);
210: e.printStackTrace();
211: }
212: }
213: }
214:
215: private void processRequest(AFileRequest req) {
216: if (DEBUG)
217: System.err
218: .println(name + " processing request: " + req);
219:
220: // Read request
221: if (req instanceof AFileReadRequest) {
222: AFileReadRequest rreq = (AFileReadRequest) req;
223: AFileTPImpl impl = (AFileTPImpl) rreq.getImpl();
224: RandomAccessFile raf = impl.raf;
225: BufferElement buf = rreq.buf;
226: try {
227: int c = raf.read(buf.data, buf.offset, buf.size);
228: if (c == -1) {
229: req.complete(new AFileEOFReached(req));
230: } else if (c < buf.size) {
231: // This can occur if buf.size is less than the size of the file
232: req.complete(new AFileIOCompleted(req, c));
233: req.complete(new AFileEOFReached(req));
234: } else {
235: req
236: .complete(new AFileIOCompleted(req,
237: buf.size));
238: }
239: } catch (IOException ioe) {
240: req
241: .complete(new AFileIOExceptionOccurred(req,
242: ioe));
243: }
244:
245: // Write request
246: } else if (req instanceof AFileWriteRequest) {
247: AFileWriteRequest wreq = (AFileWriteRequest) req;
248: AFileTPImpl impl = (AFileTPImpl) wreq.getImpl();
249: RandomAccessFile raf = impl.raf;
250: BufferElement buf = wreq.buf;
251: try {
252: raf.write(buf.data, buf.offset, buf.size);
253: req.complete(new AFileIOCompleted(req, buf.size));
254: } catch (IOException ioe) {
255: req
256: .complete(new AFileIOExceptionOccurred(req,
257: ioe));
258: }
259:
260: // Seek request
261: } else if (req instanceof AFileSeekRequest) {
262: AFileSeekRequest sreq = (AFileSeekRequest) req;
263: AFileTPImpl impl = (AFileTPImpl) sreq.getImpl();
264: RandomAccessFile raf = impl.raf;
265: try {
266: raf.seek(sreq.offset);
267: } catch (IOException ioe) {
268: req
269: .complete(new AFileIOExceptionOccurred(req,
270: ioe));
271: }
272:
273: // Close request
274: } else if (req instanceof AFileCloseRequest) {
275: AFileCloseRequest creq = (AFileCloseRequest) req;
276: AFileTPImpl impl = (AFileTPImpl) creq.getImpl();
277: RandomAccessFile raf = impl.raf;
278: try {
279: raf.close();
280: } catch (IOException ioe) {
281: req
282: .complete(new AFileIOExceptionOccurred(req,
283: ioe));
284: }
285: req.complete(new SinkClosedEvent(req.afile));
286:
287: // Flush request
288: } else if (req instanceof AFileFlushRequest) {
289: // Don't know how to flush an RAF
290: req.complete(new SinkFlushedEvent(req.afile));
291:
292: } else {
293: throw new Error(
294: "AFileTPTM.AFileTPThread.processRequest got bad request: "
295: + req);
296: }
297:
298: }
299: }
300:
301: }
|