001: /*
002: * Copyright (c) 2000 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.lib.aSocket.nio;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.core.*;
029: import seda.sandStorm.lib.aSocket.*;
030:
031: import java.net.*;
032: import java.io.*;
033: import java.util.*;
034:
035: import java.nio.*;
036: import java.nio.channels.*;
037:
038: /**
039: * Internal class used to represent state of a socket while an
040: * outgoing connection is pending.
041: */
042: public class ConnectSockState extends
043: seda.sandStorm.lib.aSocket.ConnectSockState {
044:
045: private static final boolean DEBUG = false;
046: private static final boolean PROFILE = false;
047:
048: private SocketChannel nio_sc;
049: private SelectionKey selkey;
050: private NIOSelectSource write_nio_selsource;
051:
052: /* private ATcpClientSocket clisock;
053: private SinkIF compQ;
054: private int connectClogTries, connectNumTries;
055: private int writeClogThreshold;
056: boolean completed = false; */
057:
058: public ConnectSockState(ATcpConnectRequest req,
059: SelectSourceIF write_selsource) throws IOException {
060: this (req);
061: this .write_nio_selsource = (NIOSelectSource) write_selsource;
062: selkey = (SelectionKey) write_selsource.register(nio_sc,
063: SelectionKey.OP_CONNECT);
064: if (DEBUG)
065: System.err.println("Done creating ConnectSockState");
066: selkey.attach(this );
067: }
068:
069: protected ConnectSockState(ATcpConnectRequest req)
070: throws IOException {
071: this .clisock = req.clisock;
072: this .compQ = req.compQ;
073: this .writeClogThreshold = req.writeClogThreshold;
074: this .connectClogTries = req.connectClogTries;
075: this .connectNumTries = 0;
076: try {
077: nio_sc = SocketChannel.open();
078: nio_sc.configureBlocking(false);
079: if (DEBUG)
080: System.err.println("connecting to " + req.addr + ", "
081: + req.port);
082: nio_sc.connect(new InetSocketAddress(req.addr, req.port));
083: } catch (IOException ioe) {
084: // Cannot connect
085: compQ
086: .enqueue_lossy(new ATcpConnectFailedEvent(clisock,
087: "Got error trying to connect: "
088: + ioe.getMessage()));
089: return;
090: }
091: }
092:
093: protected void complete() {
094: if (completed)
095: return; // In case we get triggered for complete twice
096: if (DEBUG)
097: System.err.println("completing connections");
098: Object key = null;
099:
100: try {
101:
102: // Do a split-phase enqueue:
103: // First prepare an empty connection, prepare it for enqueue,
104: // then finish the connect...
105: ATcpConnection conn;
106: if ((!nio_sc.finishConnect()) && DEBUG) {
107: System.err.println("Did not finish connect!!");
108: }
109: conn = new ATcpConnection(clisock, nio_sc.socket()
110: .getInetAddress(), nio_sc.socket().getPort());
111:
112: QueueElementIF tmparr[] = new QueueElementIF[1];
113: tmparr[0] = conn;
114:
115: try {
116: key = compQ.enqueue_prepare(tmparr);
117: } catch (SinkException se) {
118: // Whoops - cannot enqueue it
119: if (connectNumTries++ > connectClogTries) {
120: // Can't do it; just drop
121: System.err
122: .println("aSocket: Warning: dropping connect completion in css.complete, sink is clogged");
123: write_nio_selsource.deregister(selkey);
124: nio_sc.socket().close();
125: }
126: // Try again later
127: return;
128: }
129:
130: if (DEBUG)
131: System.err.println("finishing");
132: // Reserved entry, complete connection
133: if (nio_sc.socket().getRemoteSocketAddress() == null) {
134: compQ.enqueue_abort(key);
135: System.err
136: .println("aSocket.CSS.complete: Warning: connectDone returned false!");
137: // Try again later
138: return;
139: }
140:
141: Socket sock = nio_sc.socket();
142: if (DEBUG)
143: System.err
144: .println("ConnectSockState: connect finished on "
145: + sock.getInetAddress()
146: .getHostAddress()
147: + ":"
148: + sock.getPort());
149: SockState ss = new SockState(conn, sock, writeClogThreshold);
150: conn.sockState = ss;
151:
152: // Finally enqueue
153: compQ.enqueue_commit(key);
154: completed = true;
155:
156: } catch (IOException ioe) {
157: error(new ATcpConnectFailedEvent(clisock,
158: "Got error trying to connect: " + ioe.getMessage()));
159: if (key != null)
160: compQ.enqueue_abort(key);
161:
162: return;
163: }
164:
165: // Deregister
166: if (DEBUG)
167: System.err
168: .println("WriteThread: CSS.complete: Deregistering si");
169: // write_nio_selsource.deregister(selkey);
170: }
171:
172: protected void error(aSocketErrorEvent error) {
173: write_nio_selsource.deregister(selkey);
174: compQ.enqueue_lossy(error);
175: }
176: }
|