001: /*
002: * Copyright (c) 2000 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.lib.aSocket.nbio;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.core.*;
029: import seda.sandStorm.lib.aSocket.*;
030: import seda.nbio.*;
031:
032: import java.net.*;
033: import java.io.*;
034: import java.util.*;
035:
036: /**
037: * Internal class used to represent state of a socket while an
038: * outgoing connection is pending.
039: */
040: public class ConnectSockState extends
041: seda.sandStorm.lib.aSocket.ConnectSockState {
042:
043: private static final boolean DEBUG = false;
044: private static final boolean PROFILE = false;
045:
046: private NonblockingSocket nbsock;
047: private SelectItem si;
048: private SelectSource write_selsource;
049:
050: /* private ATcpClientSocket clisock;
051: private SinkIF compQ;
052: private int connectClogTries, connectNumTries;
053: private int writeClogThreshold;
054: boolean completed = false; */
055:
056: public ConnectSockState(ATcpConnectRequest req,
057: SelectSourceIF write_selsource) throws IOException {
058: this (req);
059: this .write_selsource = (SelectSource) write_selsource;
060: si = new SelectItem(nbsock, this , Selectable.CONNECT_READY);
061: write_selsource.register(si);
062: }
063:
064: protected ConnectSockState(ATcpConnectRequest req)
065: throws IOException {
066: this .clisock = req.clisock;
067: this .compQ = req.compQ;
068: this .writeClogThreshold = req.writeClogThreshold;
069: this .connectClogTries = req.connectClogTries;
070: this .connectNumTries = 0;
071: try {
072: nbsock = new NonblockingSocket(req.addr, req.port, false);
073: } catch (IOException ioe) {
074: // Cannot connect
075: compQ
076: .enqueue_lossy(new ATcpConnectFailedEvent(clisock,
077: "Got error trying to connect: "
078: + ioe.getMessage()));
079: return;
080: }
081: }
082:
083: protected void complete() {
084: if (completed)
085: return; // In case we get triggered for complete twice
086: if (DEBUG)
087: System.err.println("completing connections");
088: Object key = null;
089:
090: try {
091:
092: // Do a split-phase enqueue:
093: // First prepare an empty connection, prepare it for enqueue,
094: // then finish the connect...
095: ATcpConnection conn;
096: conn = new ATcpConnection(clisock, nbsock.getInetAddress(),
097: nbsock.getPort());
098:
099: QueueElementIF tmparr[] = new QueueElementIF[1];
100: tmparr[0] = conn;
101:
102: try {
103: key = compQ.enqueue_prepare(tmparr);
104: } catch (SinkException se) {
105: // Whoops - cannot enqueue it
106: if (connectNumTries++ > connectClogTries) {
107: // Can't do it; just drop
108: System.err
109: .println("aSocket: Warning: dropping connect completion in css.complete, sink is clogged");
110: write_selsource.deregister(si);
111: nbsock.close();
112: }
113: // Try again later
114: return;
115: }
116:
117: if (DEBUG)
118: System.err.println("finishing");
119: // Reserved entry, complete connection
120: if (!nbsock.connectDone()) {
121: compQ.enqueue_abort(key);
122: System.err
123: .println("aSocket.CSS.complete: Warning: connectDone returned false!");
124: // Try again later
125: return;
126: }
127:
128: Socket sock = nbsock;
129: if (DEBUG)
130: System.err
131: .println("ConnectSockState: connect finished on "
132: + sock.getInetAddress()
133: .getHostAddress()
134: + ":"
135: + sock.getPort());
136: SockState ss = new SockState(conn, sock, writeClogThreshold);
137: conn.sockState = ss;
138:
139: // Finally enqueue
140: compQ.enqueue_commit(key);
141: completed = true;
142:
143: } catch (IOException ioe) {
144: error(new ATcpConnectFailedEvent(clisock,
145: "Got error trying to connect: " + ioe.getMessage()));
146: if (key != null)
147: compQ.enqueue_abort(key);
148:
149: return;
150: }
151:
152: // Deregister
153: if (DEBUG)
154: System.err
155: .println("WriteThread: CSS.complete: Deregistering si");
156: si.revents = 0;
157: write_selsource.deregister(si);
158: }
159:
160: protected void error(aSocketErrorEvent error) {
161: si.revents = 0;
162: compQ.enqueue_lossy(error);
163: write_selsource.deregister(si);
164: }
165: }
|