001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.lib.aSocket.nbio;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.core.*;
029: import seda.sandStorm.lib.aSocket.*;
030: import seda.nbio.*;
031:
032: import java.net.*;
033: import java.io.*;
034: import java.util.*;
035:
036: /**
037: * Internal class used to represent state of an active datagram socket.
038: */
039: public class DatagramSockState extends
040: seda.sandStorm.lib.aSocket.DatagramSockState {
041:
042: private static final boolean DEBUG = false;
043:
044: private NonblockingDatagramSocket dgsock;
045: private SelectItem readsi, writesi;
046:
047: /* AUdpSocket udpsock;
048: private SinkIF readCompQ;
049: private QueueElementIF clogged_qel;
050: private int clogged_numtries;
051: private int readClogTries, writeClogThreshold, maxPacketSize;
052:
053: private byte readBuf[];
054: boolean closed = false;
055: long seqNum = 1;
056: private AUdpInPacket pkt;
057:
058: int outstanding_writes, numEmptyWrites;
059: ssLinkedList writeReqList;
060: AUdpWriteRequest cur_write_req;
061: BufferElement cur_write_buf; */
062:
063: private SelectSource read_selsource;
064: private SelectSource write_selsource;
065:
066: // private BufferElement cur_write_buf;
067:
068: public DatagramSockState(AUdpSocket sock, InetAddress addr, int port)
069: throws IOException {
070: if (DEBUG)
071: System.err.println("DatagramSockState: Constructor called");
072: this .udpsock = sock;
073: this .readCompQ = sock.compQ;
074: this .writeClogThreshold = sock.writeClogThreshold;
075: this .maxPacketSize = sock.maxPacketSize;
076:
077: if (DEBUG)
078: System.err.println("DatagramSockState : setting up socket");
079: this .dgsock = new NonblockingDatagramSocket(port, addr);
080:
081: readBuf = new byte[maxPacketSize];
082: this .write_selsource = null;
083: if (DEBUG)
084: System.err.println("DatagramSockState " + dgsock
085: + ": Const creating readBuf of size "
086: + maxPacketSize);
087:
088: if (DEBUG)
089: System.err.println("DatagramSockState " + dgsock
090: + ": Setting flags");
091: outstanding_writes = 0;
092: numEmptyWrites = 0;
093: writeReqList = new ssLinkedList();
094: clogged_qel = null;
095: clogged_numtries = 0;
096: if (DEBUG)
097: System.err.println("DatagramSockState " + dgsock
098: + ": Const done");
099: }
100:
101: // This is synchronized with close()
102: protected synchronized void readInit(SelectSourceIF read_selsource,
103: SinkIF compQ, int readClogTries) {
104: if (DEBUG)
105: System.err.println("readInit called on " + this );
106: if (DEBUG)
107: System.err.println("read_selsource = " + read_selsource);
108: if (closed)
109: return; // May have been closed already
110: this .readCompQ = compQ;
111: this .readClogTries = readClogTries;
112: this .read_selsource = (SelectSource) read_selsource;
113: readsi = new SelectItem(dgsock, this , Selectable.READ_READY);
114: this .read_selsource.register(readsi);
115: }
116:
117: protected void doRead() {
118: if (DEBUG)
119: System.err.println("DatagramSockState: doRead called");
120:
121: // When using SelectSource, we need this guard, since after closing
122: // a socket we may have outstanding read events still in the queue
123: if (closed)
124: return;
125:
126: if (clogged_qel != null) {
127: // Try to drain the clogged element first
128: if (DEBUG)
129: System.err
130: .println("DatagramSockState: doRead draining clogged element "
131: + clogged_qel);
132: try {
133: readCompQ.enqueue(clogged_qel);
134: } catch (SinkFullException qfe) {
135: // Nope, still clogged
136: if ((readClogTries != -1)
137: && (++clogged_numtries >= readClogTries)) {
138: if (DEBUG)
139: System.err
140: .println("DatagramSockState: warning: readClogTries exceeded, dropping "
141: + clogged_qel);
142: clogged_qel = null;
143: clogged_numtries = 0;
144: } else {
145: // Try again later
146: return;
147: }
148: } catch (SinkException sce) {
149: // Whoops - user went away - just drop
150: this .close(null);
151: }
152: }
153:
154: int len;
155: DatagramPacket p;
156:
157: try {
158: if (DEBUG)
159: System.err
160: .println("DatagramSockState: doRead trying receive");
161: p = new DatagramPacket(readBuf, 0, readBuf.length);
162: len = dgsock.nbReceive(p);
163: if (DEBUG)
164: System.err
165: .println("DatagramSockState: receive returned "
166: + len);
167:
168: if (len == 0) {
169: // Didn't read anything - just drop
170: readsi.revents = 0;
171: return;
172: } else if (len < 0) {
173: // Read failed - assume socket is dead
174: if (DEBUG)
175: System.err
176: .println("dgss.doRead: read failed, sock closed");
177: this .close(readCompQ);
178: readsi.revents = 0;
179: return;
180: }
181: } catch (Exception e) {
182: // Read failed - assume socket is dead
183: if (DEBUG)
184: System.err
185: .println("dgss.doRead: read got IOException: "
186: + e.getMessage());
187: this .close(readCompQ);
188: readsi.revents = 0;
189: return;
190: }
191:
192: if (DEBUG)
193: System.err
194: .println("dgss.doRead: Pushing up new AUdpInPacket, len="
195: + len);
196:
197: pkt = new AUdpInPacket(udpsock, p, seqNum);
198: // 0 is special (indicates no sequence number)
199: seqNum++;
200: if (seqNum == 0)
201: seqNum = 1;
202: readBuf = new byte[maxPacketSize];
203:
204: try {
205: readCompQ.enqueue(pkt);
206: } catch (SinkFullException qfe) {
207: clogged_qel = pkt;
208: clogged_numtries = 0;
209: return;
210: } catch (SinkException sce) {
211: // User has gone away
212: this .close(null);
213: return;
214: }
215: readsi.revents = 0;
216: }
217:
218: // This is synchronized with close() to avoid a race with close()
219: // removing the writeReqList while this method is being called.
220: // Probably a better way to do this...
221: protected synchronized boolean addWriteRequest(aSocketRequest req,
222: SourceIF write_selsource) {
223: if (closed)
224: return false;
225:
226: if (DEBUG)
227: System.err
228: .println("DatagramSockState: addWriteRequest called");
229:
230: if (this .write_selsource == null) {
231: if (DEBUG)
232: System.err
233: .println("DatagramSockState: Setting selsource to "
234: + write_selsource);
235: this .write_selsource = (SelectSource) write_selsource;
236: writesi = new SelectItem(dgsock, this ,
237: Selectable.WRITE_READY);
238: this .write_selsource.register(writesi);
239: if (DEBUG)
240: System.err
241: .println("SockState: Registered with selsource");
242: } else if (this .outstanding_writes == 0) {
243: numEmptyWrites = 0;
244: writeMaskEnable();
245: }
246:
247: if ((writeClogThreshold != -1)
248: && (this .outstanding_writes > writeClogThreshold)) {
249: if (DEBUG)
250: System.err
251: .println("DatagramSockState: warning: writeClogThreshold exceeded, dropping "
252: + req);
253: if (req instanceof AUdpWriteRequest)
254: return false;
255: if (req instanceof AUdpCloseRequest) {
256: // Do immediate close: Assume socket is clogged
257: AUdpCloseRequest creq = (AUdpCloseRequest) req;
258: this .close(creq.compQ);
259: return true;
260: }
261: }
262:
263: if (DEBUG)
264: System.err
265: .println("DatagramSockState: Adding writeReq to tail");
266: writeReqList.add_to_tail(req);
267: this .outstanding_writes++;
268: return true;
269: }
270:
271: void initWrite(AUdpWriteRequest req) {
272: this .cur_write_req = req;
273: this .cur_write_buf = req.buf;
274: }
275:
276: protected boolean tryWrite() throws SinkClosedException {
277: int ret;
278: DatagramPacket outgoing;
279:
280: try {
281: if (cur_write_buf instanceof AUdpPacket) {
282: AUdpPacket udpp = (AUdpPacket) cur_write_buf;
283: outgoing = new DatagramPacket(udpp.data, udpp.offset,
284: udpp.size, udpp.address, udpp.port);
285: } else {
286: outgoing = new DatagramPacket(cur_write_buf.data,
287: cur_write_buf.offset, cur_write_buf.size);
288: }
289: ret = dgsock.nbSend(outgoing);
290: } catch (IOException ioe) {
291: // Assume this is because socket was already closed
292: this .close(null);
293: throw new SinkClosedException(
294: "DatagramSockState: tryWrite got exception doing write: "
295: + ioe.getMessage());
296: }
297: if (ret == cur_write_buf.size)
298: return true;
299: else
300: return false;
301: }
302:
303: void writeReset() {
304: this .cur_write_req = null;
305: this .outstanding_writes--;
306: }
307:
308: protected void writeMaskEnable() {
309: writesi.events |= Selectable.WRITE_READY;
310: write_selsource.update(writesi);
311: }
312:
313: protected void writeMaskDisable() {
314: writesi.events &= ~(Selectable.WRITE_READY);
315: write_selsource.update(writesi);
316: }
317:
318: boolean isClosed() {
319: return closed;
320: }
321:
322: // This is synchronized to avoid close() interfering with
323: // addWriteRequest
324: protected synchronized void close(SinkIF closeEventQueue) {
325: if (closed)
326: return;
327:
328: closed = true;
329:
330: if (DEBUG)
331: System.err
332: .println("DatagramSockState.close(): Deregistering with selsources");
333: if (read_selsource != null)
334: read_selsource.deregister(readsi);
335: if (write_selsource != null)
336: write_selsource.deregister(writesi);
337: if (DEBUG)
338: System.err
339: .println("DatagramSockState.close(): done deregistering with selsources");
340: // Eliminate write queue
341:
342: writeReqList = null;
343:
344: if (DEBUG)
345: System.err
346: .println("DatagramSockState.close(): doing close");
347: dgsock.close();
348:
349: if (closeEventQueue != null) {
350: SinkClosedEvent sce = new SinkClosedEvent(udpsock);
351: closeEventQueue.enqueue_lossy(sce);
352: }
353: }
354:
355: public String toString() {
356: return "DatagramSockState [" + dgsock + "]";
357: }
358:
359: protected DatagramSocket getSocket() {
360: return dgsock;
361: }
362:
363: protected void connect(InetAddress addr, int port) {
364: dgsock.connect(addr, port);
365: }
366:
367: }
|