001: /*
002: * Copyright (c) 2000 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.lib.aSocket;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.api.internal.*;
029: import seda.sandStorm.core.*;
030: import java.io.*;
031: import java.net.*;
032:
033: /**
034: * An ATcpConnection represents an established connection on an asynchronous
035: * socket. It is used to send outgoing packets over the connection, and
036: * to initiate packet reads from the connection. When a packet arrives on
037: * this connection, an ATcpInPacket object will be pushed to the SinkIF
038: * specified by the startReader() call. The ATcpInPacket will contain a
039: * pointer to this ATcpConnection. This object also allows the connection
040: * to be flushed or closed.
041: *
042: * @author Matt Welsh
043: * @see ATcpInPacket
044: *
045: */
046: public class ATcpConnection extends SimpleSink implements
047: QueueElementIF {
048:
049: private InetAddress address;
050: private int port;
051: private boolean closed;
052: private boolean readerstarted;
053:
054: private ATcpClientSocket clientSocket;
055: private ATcpServerSocket serverSocket;
056:
057: // Internal SockState associated with this connection
058: public SockState sockState;
059:
060: /**
061: * The application may use this field to associate some
062: * application-specific state with this connection. The aSocket
063: * layer will not read or modify this field in any way.
064: */
065: public Object userTag;
066:
067: private ATcpConnection(InetAddress address, int port) {
068: this .address = address;
069:
070: this .port = port;
071: this .closed = false;
072: }
073:
074: public ATcpConnection(ATcpClientSocket cliSock,
075: InetAddress address, int port) {
076: this (address, port);
077: this .clientSocket = cliSock;
078: }
079:
080: public ATcpConnection(ATcpServerSocket servSock,
081: InetAddress address, int port) {
082: this (address, port);
083: this .serverSocket = servSock;
084: }
085:
086: protected ATcpConnection() {
087: }
088:
089: /**
090: * Return the address of the peer.
091: */
092: public InetAddress getAddress() {
093: return address;
094: }
095:
096: /**
097: * Return the port of the peer.
098: */
099: public int getPort() {
100: return port;
101: }
102:
103: /**
104: * Return the ATcpServerSocket from which this connection came.
105: * Returns null if this connection resulted from an ATcpClientSocket.
106: */
107: public ATcpServerSocket getServerSocket() {
108: return serverSocket;
109: }
110:
111: /**
112: * Return the ATcpClientSocket from which this connection came.
113: * Returns null if this connection resulted from an ATcpServerSocket.
114: */
115: public ATcpClientSocket getClientSocket() {
116: return clientSocket;
117: }
118:
119: /**
120: * Associate a SinkIF with this connection and allow data
121: * to start flowing into it. When data is read, ATcpInPacket objects
122: * will be pushed into the given SinkIF. If this sink is full,
123: * the connection will attempt to allow packets to queue up in the O/S
124: * network stack (i.e. by not issuing further read calls on the
125: * socket). Until this method is called, no data will be read from
126: * the socket.
127: */
128: public void startReader(SinkIF receiveQ) {
129: if (readerstarted)
130: throw new IllegalArgumentException(
131: "startReader already called on this connection");
132: aSocketMgr.enqueueRequest(new ATcpStartReadRequest(this ,
133: receiveQ, -1));
134: readerstarted = true;
135: }
136:
137: /**
138: * Associate a SinkIF with this connection and allow data
139: * to start flowing into it. When data is read, ATcpInPacket objects
140: * will be pushed into the given SinkIF. If this queue is full,
141: * the connection will attempt to allow packets to queue up in the O/S
142: * network stack (i.e. by not issuing further read calls on the
143: * socket). Until this method is called, no data will be read from
144: * the socket.
145: *
146: * @param readClogTries The number of times the aSocket layer will
147: * attempt to push a new entry onto the given SinkIF while the
148: * SinkIF is full. The queue entry will be dropped after this many
149: * tries. The default value is -1, which indicates that the aSocket
150: * layer will attempt to push the queue entry indefinitely.
151: */
152: public void startReader(SinkIF receiveQ, int readClogTries) {
153: if (readerstarted)
154: throw new IllegalArgumentException(
155: "startReader already called on this connection");
156: aSocketMgr.enqueueRequest(new ATcpStartReadRequest(this ,
157: receiveQ, readClogTries));
158: readerstarted = true;
159: }
160:
161: /**
162: * Enqueue an outgoing packet to be written to this socket.
163: */
164: public void enqueue(QueueElementIF buf) throws SinkException {
165: if (closed)
166: throw new SinkClosedException("ATcpConnection closed");
167: if (buf == null)
168: throw new BadQueueElementException(
169: "ATcpConnection.enqueue got null element", buf);
170: aSocketMgr.enqueueRequest(new ATcpWriteRequest(this ,
171: (BufferElement) buf));
172: }
173:
174: /**
175: * Enqueue an outgoing packet to be written to this socket.
176: * Drops the packet if it cannot be enqueued.
177: */
178: public boolean enqueue_lossy(QueueElementIF buf) {
179: if (closed)
180: return false;
181: if (buf == null)
182: return false;
183: aSocketMgr.enqueueRequest(new ATcpWriteRequest(this ,
184: (BufferElement) buf));
185: return true;
186: }
187:
188: /**
189: * Enqueue a set of outgoing packets to be written to this socket.
190: */
191: public void enqueue_many(QueueElementIF bufarr[])
192: throws SinkException {
193: if (closed)
194: throw new SinkClosedException("ATcpConnection closed");
195:
196: for (int i = 0; i < bufarr.length; i++) {
197: if (bufarr[i] == null)
198: throw new BadQueueElementException(
199: "ATcpConnection.enqueue_many got null element",
200: bufarr[i]);
201: aSocketMgr.enqueueRequest(new ATcpWriteRequest(this ,
202: (BufferElement) bufarr[i]));
203: }
204: }
205:
206: /**
207: * Close the socket. A SinkClosedEvent will be posted on the given
208: * compQ when the close is complete.
209: */
210: public void close(SinkIF compQ) throws SinkClosedException {
211: if (closed)
212: throw new SinkClosedException("ATcpConnection closed");
213: closed = true;
214: aSocketMgr.enqueueRequest(new ATcpCloseRequest(this , compQ));
215: }
216:
217: /**
218: * Flush the socket. A SinkFlushedEvent will be posted on the given
219: * compQ when the close is complete.
220: */
221: public void flush(SinkIF compQ) throws SinkClosedException {
222: if (closed)
223: throw new SinkClosedException("ATcpConnection closed");
224: aSocketMgr.enqueueRequest(new ATcpFlushRequest(this , compQ));
225: }
226:
227: /**
228: * Returns the number of elements currently waiting in the sink.
229: */
230: public int size() {
231: if (sockState == null)
232: return 0;
233: if (sockState.writeReqList == null)
234: return 0; // If closed
235: else
236: return sockState.writeReqList.size();
237: }
238:
239: /**
240: * Returns the next sequence number for packets arriving on this
241: * connection. Returns 0 if this connection is not active.
242: * Note that this method may return an <b>inaccurate</b> sequence
243: * number since the call is not synchronized with new message
244: * arrivals that may increment the sequence number.
245: */
246: public long getSequenceNumber() {
247: if (sockState == null)
248: return 0;
249: return sockState.seqNum;
250: }
251:
252: /**
253: * Returns the profile size of this connection.
254: */
255: public int profileSize() {
256: return size();
257: }
258:
259: public String toString() {
260: return "ATcpConnection [" + address.getHostAddress() + ":"
261: + port + "/" + sockState + "/" + clientSocket + "]";
262: //return "ATcpConnection ["+address.getHostAddress()+"]";
263: }
264:
265: }
|