001: /*
002: * Copyright (c) 2000 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.lib.aSocket;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.core.*;
029:
030: import java.net.*;
031: import java.io.*;
032: import java.util.*;
033:
034: /**
035: * Internal event handler used to handle socket write events.
036: */
037: class WriteEventHandler extends aSocketEventHandler implements
038: EventHandlerIF, aSocketConst {
039:
040: private static final boolean DEBUG = false;
041:
042: WriteEventHandler() {
043: }
044:
045: public void init(ConfigDataIF config) {
046: }
047:
048: public void destroy() {
049: }
050:
051: private void processConnection(ConnectSockState css)
052: throws IOException {
053: if (DEBUG)
054: System.err
055: .println("WriteThread: processConnection called for "
056: + css);
057: css.complete();
058: }
059:
060: private void processTcpWrite(SockState ss) throws IOException {
061: if (DEBUG)
062: System.err
063: .println("WriteEventHandler: processTcpWrite called");
064:
065: // Socket already closed; just forget about it
066: if (ss.closed)
067: return;
068:
069: // Process queue of requests
070: if (DEBUG)
071: System.err.println("WriteEventHandler: " + ss + " has "
072: + ss.outstanding_writes + " pending requests");
073: if (ss.outstanding_writes == 0) {
074: ss.numEmptyWrites++;
075: if ((WRITE_MASK_DISABLE_THRESHOLD != -1)
076: && (ss.numEmptyWrites >= WRITE_MASK_DISABLE_THRESHOLD)) {
077: ss.writeMaskDisable();
078: }
079: if (DEBUG)
080: System.err
081: .println("WriteEventHandler: Socket has no pending writes, numEmptyWrites="
082: + ss.numEmptyWrites);
083: return;
084: }
085:
086: aSocketRequest req;
087:
088: // Avoid doing too many things on each socket
089: int num_reqs_processed = 0;
090: while (ss.writeReqList != null
091: && // JRVB: this can happen if someone closes the socket while we are processing writes.
092: ((req = (aSocketRequest) ss.writeReqList.get_head()) != null)
093: && (++num_reqs_processed < MAX_WRITE_REQS_PER_SOCKET)) {
094:
095: if (DEBUG)
096: System.err.println("Processing " + req + " ("
097: + num_reqs_processed + ")");
098:
099: if (req instanceof ATcpWriteRequest) {
100: // Handle write request
101: if (DEBUG)
102: System.err
103: .println("WriteEventHandler: Processing ATcpWriteRequest");
104: ATcpWriteRequest wreq = (ATcpWriteRequest) req;
105:
106: // Skip if locked
107: if ((ss.cur_write_req != null)
108: && (ss.cur_write_req != req))
109: break;
110:
111: if (ss.cur_write_req == null) {
112: if (DEBUG)
113: System.err
114: .println("WriteEventHandler: Doing initWrite");
115: ss.initWrite((ATcpWriteRequest) req);
116: }
117:
118: boolean done = false;
119: int c = 0;
120:
121: // Try hard to finish this packet
122: try {
123: while ((!(done = ss.tryWrite()))
124: && (c++ < TRYWRITE_SPIN))
125: ;
126: } catch (SinkClosedException sde) {
127: // OK, the socket closed underneath us
128: // XXX MDW: Taking this out for now - expect the SinkClosedEvent
129: // to be pushed up when read() fails
130:
131: //SinkIF cq = wreq.buf.getCompletionQueue();
132: //if (cq != null) {
133: // SinkClosedEvent sce = new SinkClosedEvent(wreq.conn);
134: // cq.enqueue_lossy(sce);
135: //}
136: }
137:
138: if (done) {
139: if (DEBUG)
140: System.err
141: .println("WriteEventHandler: Finished write");
142: // Finished this write
143: ss.writeReset();
144:
145: // Send completion upcall
146: SinkIF cq = wreq.buf.getCompletionQueue();
147: if (cq != null) {
148: SinkDrainedEvent sde = new SinkDrainedEvent(
149: ss.conn, wreq.buf);
150: cq.enqueue_lossy(sde);
151: }
152:
153: // Clear the request
154: if (!ss.isClosed()) {
155: ss.writeReqList.remove_head();
156: } else {
157: return; // Nothing more to do
158: }
159:
160: } else {
161: if (DEBUG)
162: System.err
163: .println("WriteEventHandler: Write not completed");
164: break; // Don't want to process anything else here
165: }
166:
167: } else if (req instanceof ATcpFlushRequest) {
168:
169: ATcpFlushRequest freq = (ATcpFlushRequest) req;
170:
171: // Skip if locked
172: if ((ss.cur_write_req != null)
173: && (ss.cur_write_req != req))
174: break;
175:
176: // OK - by the time we have the lock we can claim the flush is done
177: if (freq.compQ != null) {
178: // JRVB: added check to avoid NullPointerException
179: SinkFlushedEvent sfe = new SinkFlushedEvent(
180: freq.conn);
181: freq.compQ.enqueue_lossy(sfe);
182: }
183:
184: // Clear the request
185: if (!ss.isClosed()) {
186: ss.writeReqList.remove_head();
187: ss.writeReset();
188: } else {
189: return; // Nothing more to do
190: }
191:
192: } else if (req instanceof ATcpCloseRequest) {
193:
194: ATcpCloseRequest creq = (ATcpCloseRequest) req;
195:
196: // Skip if locked
197: if ((ss.cur_write_req != null)
198: && (ss.cur_write_req != req))
199: break;
200:
201: // OK - by the time we have the lock we can claim the close is done
202: ss.close(creq.compQ);
203:
204: return;
205:
206: } else {
207: throw new IllegalArgumentException(
208: "Invalid incoming request to WriteEventHandler: "
209: + req);
210: }
211: }
212:
213: if (DEBUG)
214: System.err.println("WriteEventHandler: Processed "
215: + num_reqs_processed + " writes in one go");
216:
217: }
218:
219: private void processUdpWrite(DatagramSockState ss)
220: throws IOException {
221:
222: if (DEBUG)
223: System.err
224: .println("WriteEventHandler: processUdpWrite called");
225:
226: // Socket already closed; just forget about it
227: if (ss.closed)
228: return;
229:
230: // Process queue of requests
231: if (DEBUG)
232: System.err.println("WriteEventHandler: " + ss + " has "
233: + ss.outstanding_writes + " pending requests");
234: if (ss.outstanding_writes == 0) {
235: ss.numEmptyWrites++;
236: if ((WRITE_MASK_DISABLE_THRESHOLD != -1)
237: && (ss.numEmptyWrites >= WRITE_MASK_DISABLE_THRESHOLD)) {
238: ss.writeMaskDisable();
239: }
240: if (DEBUG)
241: System.err
242: .println("WriteEventHandler: Socket has no pending writes, numEmptyWrites="
243: + ss.numEmptyWrites);
244: return;
245: }
246:
247: aSocketRequest req;
248:
249: // Avoid doing too many things on each socket
250: int num_reqs_processed = 0;
251: while (((req = (aSocketRequest) ss.writeReqList.get_head()) != null)
252: && (++num_reqs_processed < MAX_WRITE_REQS_PER_SOCKET)) {
253:
254: if (DEBUG)
255: System.err.println("Processing " + req + " ("
256: + num_reqs_processed + ")");
257:
258: if (req instanceof AUdpWriteRequest) {
259: // Handle write request
260: if (DEBUG)
261: System.err
262: .println("WriteEventHandler: Processing AUdpWriteRequest");
263: AUdpWriteRequest wreq = (AUdpWriteRequest) req;
264:
265: // Skip if locked
266: if ((ss.cur_write_req != null)
267: && (ss.cur_write_req != req))
268: break;
269:
270: if (ss.cur_write_req == null) {
271: if (DEBUG)
272: System.err
273: .println("WriteEventHandler: Doing initWrite");
274: ss.initWrite(wreq);
275: }
276:
277: boolean done = false;
278: int c = 0;
279:
280: // Try hard to finish this packet
281: try {
282: while ((!(done = ss.tryWrite()))
283: && (c++ < TRYWRITE_SPIN))
284: ;
285: } catch (SinkClosedException sde) {
286: // Ignore - expect the SinkClosedEvent to be pushed up when
287: // receive() fails
288: }
289:
290: if (done) {
291: if (DEBUG)
292: System.err
293: .println("WriteEventHandler: Finished write");
294: // Finished this write
295: ss.writeReset();
296:
297: // Send completion upcall
298: SinkIF cq = wreq.buf.getCompletionQueue();
299: if (cq != null) {
300: SinkDrainedEvent sde = new SinkDrainedEvent(
301: ss.udpsock, wreq.buf);
302: cq.enqueue_lossy(sde);
303: }
304:
305: // Clear the request
306: if (!ss.isClosed()) {
307: ss.writeReqList.remove_head();
308: } else {
309: return; // Nothing more to do
310: }
311:
312: } else {
313: if (DEBUG)
314: System.err
315: .println("WriteEventHandler: Write not completed");
316: break; // Don't want to process anything else here
317: }
318:
319: } else if (req instanceof AUdpFlushRequest) {
320:
321: AUdpFlushRequest freq = (AUdpFlushRequest) req;
322:
323: // Skip if locked
324: if ((ss.cur_write_req != null)
325: && (ss.cur_write_req != req))
326: break;
327:
328: // OK - by the time we have the lock we can claim the flush is done
329: SinkFlushedEvent sfe = new SinkFlushedEvent(freq.sock);
330: freq.compQ.enqueue_lossy(sfe);
331:
332: // Clear the request
333: if (!ss.isClosed()) {
334: ss.writeReqList.remove_head();
335: } else {
336: return; // Nothing more to do
337: }
338:
339: } else if (req instanceof AUdpCloseRequest) {
340:
341: AUdpCloseRequest creq = (AUdpCloseRequest) req;
342:
343: // Skip if locked
344: if ((ss.cur_write_req != null)
345: && (ss.cur_write_req != req))
346: break;
347:
348: // OK - by the time we have the lock we can claim the close is done
349: ss.close(creq.compQ);
350:
351: return;
352:
353: } else {
354: throw new IllegalArgumentException(
355: "Invalid incoming request to WriteEventHandler: "
356: + req);
357: }
358: }
359:
360: if (DEBUG)
361: System.err.println("WriteEventHandler: Processed "
362: + num_reqs_processed + " writes in one go");
363: }
364:
365: private void processWriteRequest(aSocketRequest req)
366: throws IOException {
367:
368: if (req instanceof ATcpConnectRequest) {
369:
370: // This registers itself
371: ConnectSockState ss;
372: ss = aSocketMgr.getFactory().newConnectSockState(
373: (ATcpConnectRequest) req, selsource);
374:
375: } else if (req instanceof AUdpConnectRequest) {
376:
377: if (DEBUG)
378: System.err
379: .println("WriteEventHandler: processing AUdpConnectRequest: "
380: + req);
381: AUdpConnectRequest creq = (AUdpConnectRequest) req;
382: AUdpSocket udpsock = creq.sock;
383: udpsock.sockState.connect(creq.addr, creq.port);
384: // only works in jdk1.4
385: // if (DEBUG) System.err.println("connected = " + udpsock.getSocket().isConnected());
386: AUdpConnectEvent ev = new AUdpConnectEvent(udpsock);
387: udpsock.compQ.enqueue_lossy(ev);
388:
389: } else if (req instanceof AUdpDisconnectRequest) {
390:
391: AUdpDisconnectRequest dreq = (AUdpDisconnectRequest) req;
392: AUdpSocket udpsock = dreq.sock;
393: udpsock.getSocket().disconnect();
394: AUdpDisconnectEvent ev = new AUdpDisconnectEvent(udpsock);
395: udpsock.compQ.enqueue_lossy(ev);
396:
397: } else if (req instanceof ATcpWriteRequest) {
398:
399: if (DEBUG)
400: System.err
401: .println("WriteEventHandler: got write request: "
402: + req);
403: SockState ss = ((ATcpWriteRequest) req).conn.sockState;
404:
405: // If already closed, just drop it
406: if (!ss.closed) {
407: if (DEBUG)
408: System.err
409: .println("WriteEventHandler: Adding write req to "
410: + ss);
411:
412: if (!ss.addWriteRequest(req, selsource)) {
413: // Couldn't enqueue: this connection is clogged
414: ATcpWriteRequest wreq = (ATcpWriteRequest) req;
415: SinkIF cq = wreq.buf.getCompletionQueue();
416: if (cq != null) {
417: SinkCloggedEvent sce = new SinkCloggedEvent(
418: wreq.conn, wreq.buf);
419: cq.enqueue_lossy(sce);
420: }
421: } else {
422: if (DEBUG)
423: System.err.println("WriteEventHandler: "
424: + ss.outstanding_writes
425: + " outstanding writes");
426: }
427: }
428:
429: } else if (req instanceof AUdpWriteRequest) {
430:
431: DatagramSockState ss = ((AUdpWriteRequest) req).sock.sockState;
432:
433: // If already closed, just drop it
434: if (!ss.closed) {
435: if (DEBUG)
436: System.err
437: .println("WriteEventHandler: Adding write req to "
438: + ss);
439:
440: if (!ss.addWriteRequest(req, selsource)) {
441: // Couldn't enqueue: this connection is clogged
442: AUdpWriteRequest wreq = (AUdpWriteRequest) req;
443: SinkIF cq = wreq.buf.getCompletionQueue();
444: if (cq != null) {
445: SinkCloggedEvent sce = new SinkCloggedEvent(
446: wreq.sock, wreq.buf);
447: cq.enqueue_lossy(sce);
448: }
449: }
450: }
451:
452: } else if (req instanceof ATcpCloseRequest) {
453:
454: SockState ss = ((ATcpCloseRequest) req).conn.sockState;
455:
456: // If there is no pending outgoing data, do immediate close
457: if (ss.outstanding_writes == 0) {
458: ss.close(((ATcpCloseRequest) req).compQ);
459: } else {
460: // Queue it up
461: ss.addWriteRequest(req, selsource);
462: }
463:
464: } else if (req instanceof ATcpFlushRequest) {
465:
466: SockState ss = ((ATcpFlushRequest) req).conn.sockState;
467: ss.addWriteRequest(req, selsource);
468:
469: } else if (req instanceof AUdpCloseRequest) {
470:
471: DatagramSockState ss = ((AUdpCloseRequest) req).sock.sockState;
472: ss.addWriteRequest(req, selsource);
473:
474: } else if (req instanceof AUdpFlushRequest) {
475:
476: DatagramSockState ss = ((AUdpFlushRequest) req).sock.sockState;
477: ss.addWriteRequest(req, selsource);
478:
479: } else {
480: throw new IllegalArgumentException(
481: "Bad request type to enqueueWrite");
482: }
483: }
484:
485: public void handleEvent(QueueElementIF qel) {
486: if (DEBUG)
487: System.err.println("WriteEventHandler: Got QEL: " + qel);
488:
489: try {
490:
491: if (qel instanceof SelectQueueElement) {
492: Object obj;
493: obj = ((SelectQueueElement) qel).getAttachment();
494:
495: if (obj instanceof ConnectSockState) {
496: processConnection((ConnectSockState) obj);
497: } else {
498: if (qel instanceof SelectQueueElement)
499: ((SelectQueueElement) qel).clearEvents();
500: if (obj instanceof SockState) {
501: processTcpWrite((SockState) obj);
502: } else {
503: processUdpWrite((DatagramSockState) obj);
504: }
505: }
506:
507: } else if (qel instanceof aSocketRequest) {
508: processWriteRequest((aSocketRequest) qel);
509:
510: } else {
511: throw new IllegalArgumentException(
512: "WriteEventHandler: Got unknown event type "
513: + qel);
514: }
515:
516: } catch (Exception e) {
517: System.err
518: .println("WriteEventHandler: Got exception: " + e);
519: e.printStackTrace();
520: }
521: }
522:
523: public void handleEvents(QueueElementIF qelarr[]) {
524: int numWrites = 0;
525:
526: for (int i = 0; i < qelarr.length; i++) {
527:
528: try {
529:
530: QueueElementIF qel = qelarr[i];
531:
532: if (DEBUG)
533: System.err.println("WriteEventHandler: Got QEL: "
534: + qel);
535: if (qel instanceof SelectQueueElement) {
536: Object obj;
537: obj = ((SelectQueueElement) qel).getAttachment();
538: if (DEBUG)
539: System.err.println("!!!obj= " + obj);
540:
541: if (obj instanceof ConnectSockState) {
542: processConnection((ConnectSockState) obj);
543: } else {
544:
545: if ((MAX_WRITES_AT_ONCE == -1)
546: || (numWrites++ < MAX_WRITES_AT_ONCE)) {
547: if (qel instanceof SelectQueueElement)
548: ((SelectQueueElement) qel)
549: .clearEvents();
550: if (obj instanceof SockState) {
551: processTcpWrite((SockState) obj);
552: } else {
553: processUdpWrite((DatagramSockState) obj);
554: }
555: }
556: }
557: } else if (qel instanceof aSocketRequest) {
558: processWriteRequest((aSocketRequest) qel);
559:
560: } else {
561: throw new IllegalArgumentException(
562: "ReadEventHandler: Got unknown event type "
563: + qel);
564: }
565:
566: } catch (Exception e) {
567: System.err.println("WriteEventHandler: Got exception: "
568: + e);
569: e.printStackTrace();
570: }
571: }
572: }
573:
574: }
|