001: /*
002: * Copyright (c) 2000 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.lib.aSocket.nbio;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.core.*;
029: import seda.sandStorm.lib.aSocket.*;
030: import seda.nbio.*;
031:
032: import java.net.*;
033: import java.io.*;
034: import java.util.*;
035:
036: /**
037: * Internal class used to represent state of an active socket connection.
038: */
039: class SockState extends seda.sandStorm.lib.aSocket.SockState {
040:
041: private static final boolean DEBUG = false;
042:
043: private NonblockingInputStream nbis;
044: private NonblockingOutputStream nbos;
045: private SelectItem readsi, writesi;
046:
047: private SelectSource read_selsource;
048: private SelectSource write_selsource;
049:
050: public SockState(ATcpConnection conn, Socket nbsock,
051: int writeClogThreshold) throws IOException {
052: if (DEBUG)
053: System.err.println("SockState: Constructor called with "
054: + conn + ", " + nbsock + ", " + writeClogThreshold);
055: this .conn = conn;
056: this .nbsock = nbsock;
057: this .writeClogThreshold = writeClogThreshold;
058: this .write_selsource = null;
059:
060: if (DEBUG)
061: System.err.println("Sockstate " + nbsock
062: + ": Const creating nbis");
063: nbis = (NonblockingInputStream) nbsock.getInputStream();
064: if (DEBUG)
065: System.err.println("Sockstate " + nbsock
066: + ": Const creating nbos");
067: nbos = (NonblockingOutputStream) nbsock.getOutputStream();
068:
069: if (DEBUG)
070: System.err.println("SockState " + nbsock
071: + ": Const creating readBuf of size "
072: + aSocketConst.READ_BUFFER_SIZE);
073: readBuf = new byte[aSocketConst.READ_BUFFER_SIZE];
074:
075: if (DEBUG)
076: System.err.println("SockState " + nbsock
077: + ": Setting flags");
078: outstanding_writes = 0;
079: numEmptyWrites = 0;
080: writeReqList = new ssLinkedList();
081:
082: clogged_qel = null;
083: clogged_numtries = 0;
084: if (DEBUG)
085: System.err.println("SockState " + nbsock + ": Const done");
086: }
087:
088: public SockState(ATcpConnection conn, Socket nbsock,
089: Integer writeClogThreshold) throws IOException {
090: this (conn, nbsock, writeClogThreshold.intValue());
091: }
092:
093: // This is synchronized with close()
094: protected synchronized void readInit(SelectSourceIF read_selsource,
095: SinkIF compQ, int readClogTries) {
096: if (DEBUG)
097: System.err.println("readInit called on " + this );
098: if (closed)
099: return; // May have been closed already
100: this .read_selsource = (SelectSource) read_selsource;
101: this .readCompQ = compQ;
102: this .readClogTries = readClogTries;
103: readsi = new SelectItem((NonblockingSocket) nbsock, this ,
104: Selectable.READ_READY);
105: this .read_selsource.register(readsi);
106: }
107:
108: protected void doRead() {
109: if (DEBUG)
110: System.err.println("SockState: doRead called");
111:
112: // When using SelectSource, we need this guard, since after closing
113: // a socket we may have outstanding read events still in the queue
114: if (closed)
115: return;
116:
117: if (clogged_qel != null) {
118: // Try to drain the clogged element first
119: if (DEBUG)
120: System.err
121: .println("SockState: doRead draining clogged element "
122: + clogged_qel);
123: try {
124: readCompQ.enqueue(clogged_qel);
125: clogged_qel = null;
126: clogged_numtries = 0;
127: } catch (SinkFullException qfe) {
128: // Nope, still clogged
129: if ((readClogTries != -1)
130: && (++clogged_numtries >= readClogTries)) {
131: if (DEBUG)
132: System.err
133: .println("SockState: warning: readClogTries exceeded, dropping "
134: + clogged_qel);
135: clogged_qel = null;
136: clogged_numtries = 0;
137: } else {
138: // Try again later
139: return;
140: }
141: } catch (SinkException sce) {
142: // Whoops - user went away - just drop
143: this .close(null);
144: }
145: }
146:
147: int len;
148:
149: try {
150: if (DEBUG)
151: System.err.println("SockState: doRead trying read");
152: len = nbis.read(readBuf, 0, READ_BUFFER_SIZE);
153: if (DEBUG)
154: System.err.println("SockState: read returned " + len);
155:
156: if (len == 0) {
157: // XXX MDW: Sometimes we get an error return result from
158: // poll() which causes an attempted read here, but no
159: // IOException. For now I am going to just drop the "null"
160: // packet - on Linux it seems that certain TCP errors can
161: // trigger this.
162: //System.err.println("ss.doRead: Warning: Got empty read on socket");
163: readsi.revents = 0;
164: return;
165: } else if (len < 0) {
166: // Read failed - assume socket is dead
167: if (DEBUG)
168: System.err
169: .println("ss.doRead: read failed, sock closed");
170: this .close(readCompQ);
171: readsi.revents = 0;
172: return;
173: }
174: } catch (Exception e) {
175: // Read failed - assume socket is dead
176: if (DEBUG)
177: System.err.println("ss.doRead: read got IOException: "
178: + e.getMessage());
179: this .close(readCompQ);
180: readsi.revents = 0;
181: return;
182: }
183:
184: if (DEBUG)
185: System.err
186: .println("ss.doRead: Pushing up new ATcpInPacket, len="
187: + len);
188:
189: pkt = new ATcpInPacket(conn, readBuf, len,
190: aSocketConst.READ_BUFFER_COPY, seqNum);
191: // 0 is special (indicates no sequence number)
192: seqNum++;
193: if (seqNum == 0)
194: seqNum = 1;
195: if (aSocketConst.READ_BUFFER_COPY == false) {
196: readBuf = new byte[aSocketConst.READ_BUFFER_SIZE];
197: }
198:
199: try {
200: readCompQ.enqueue(pkt);
201: } catch (SinkFullException qfe) {
202: clogged_qel = pkt;
203: clogged_numtries = 0;
204: return;
205: } catch (SinkException sce) {
206: // User has gone away
207: this .close(null);
208: return;
209: }
210: readsi.revents = 0;
211: }
212:
213: // XXX This is synchronized with close() to avoid a race with close()
214: // removing the writeReqList while this method is being called.
215: // Probably a better way to do this...
216: protected synchronized boolean addWriteRequest(aSocketRequest req,
217: SelectSourceIF write_selsource) {
218: if (closed)
219: return false;
220:
221: if (DEBUG)
222: System.err.println("SockState: addWriteRequest called");
223:
224: if (this .write_selsource == null) {
225: if (DEBUG)
226: System.err.println("SockState: Setting selsource to "
227: + write_selsource);
228: this .write_selsource = (SelectSource) write_selsource;
229: writesi = new SelectItem((NonblockingSocket) nbsock, this ,
230: Selectable.WRITE_READY);
231: ((SelectSource) write_selsource).register(writesi);
232: numActiveWriteSockets++;
233: if (DEBUG)
234: System.err
235: .println("SockState: Registered with selsource");
236: } else if (this .outstanding_writes == 0) {
237: numEmptyWrites = 0;
238: writeMaskEnable();
239: }
240:
241: if ((writeClogThreshold != -1)
242: && (this .outstanding_writes > writeClogThreshold)) {
243: if (DEBUG)
244: System.err
245: .println("SockState: warning: writeClogThreshold exceeded, dropping "
246: + req);
247: if (req instanceof ATcpWriteRequest)
248: return false;
249: if (req instanceof ATcpCloseRequest) {
250: // Do immediate close: Assume socket is clogged
251: ATcpCloseRequest creq = (ATcpCloseRequest) req;
252: this .close(creq.compQ);
253: return true;
254: }
255: }
256:
257: if (DEBUG)
258: System.err.println("SockState: Adding writeReq to tail");
259: writeReqList.add_to_tail(req);
260: this .outstanding_writes++;
261: return true;
262: }
263:
264: protected void initWrite(ATcpWriteRequest req) {
265: this .cur_write_req = req;
266: this .writeBuf = req.buf.data;
267: this .cur_offset = req.buf.offset;
268: this .cur_length_target = req.buf.size + cur_offset;
269: }
270:
271: protected boolean tryWrite() throws SinkClosedException {
272: try {
273: int tryLen;
274: if (MAX_WRITE_LEN == -1) {
275: tryLen = cur_length_target - cur_offset;
276: } else {
277: tryLen = Math.min(cur_length_target - cur_offset,
278: MAX_WRITE_LEN);
279: }
280: cur_offset += nbos.nbWrite(writeBuf, cur_offset, tryLen);
281: if (DEBUG)
282: System.err.println("SockState: tryWrite() of " + tryLen
283: + " bytes (len=" + cur_length_target + ", off="
284: + cur_offset);
285:
286: } catch (IOException ioe) {
287: // Assume this is because socket was already closed
288: this .close(null);
289: throw new SinkClosedException(
290: "tryWrite got exception doing write: "
291: + ioe.getMessage());
292: }
293: if (cur_offset == cur_length_target) {
294: if (DEBUG)
295: System.err
296: .println("SockState: tryWrite() completed write of "
297: + cur_length_target + " bytes");
298: return true;
299: } else
300: return false;
301: }
302:
303: protected void writeMaskEnable() {
304: numActiveWriteSockets++;
305: writesi.events |= Selectable.WRITE_READY;
306: write_selsource.update(writesi);
307: }
308:
309: protected void writeMaskDisable() {
310: numActiveWriteSockets--;
311: writesi.events &= ~(Selectable.WRITE_READY);
312: write_selsource.update(writesi);
313: }
314:
315: // XXX This is synchronized to avoid close() interfering with
316: // addWriteRequest
317: protected synchronized void close(SinkIF closeEventQueue) {
318: if (closed)
319: return;
320:
321: closed = true;
322:
323: if (DEBUG)
324: System.err
325: .println("SockState.close(): Deregistering with selsources");
326: if (read_selsource != null)
327: read_selsource.deregister(readsi);
328: if (write_selsource != null)
329: write_selsource.deregister(writesi);
330: if (DEBUG)
331: System.err
332: .println("SockState.close(): done deregistering with selsources");
333: // Eliminate write queue
334:
335: // XXX XXX XXX MDW: This introduces a race condition with
336: // addWriteRequest() -- need to serialize close() with other
337: // queue operations on the socket.
338: writeReqList = null;
339:
340: try {
341: if (DEBUG)
342: System.err.println("SockState.close(): doing close ["
343: + nbsock + "]");
344: nbsock.close();
345: } catch (IOException e) {
346: // Do nothing
347: }
348:
349: if (closeEventQueue != null) {
350: SinkClosedEvent sce = new SinkClosedEvent(conn);
351: closeEventQueue.enqueue_lossy(sce);
352: }
353: }
354:
355: }
|