001: /*
002: * Copyright (c) 2000 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.lib.aSocket;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.core.*;
029: import java.io.*;
030: import java.net.*;
031:
032: /**
033: * An AUdpSocket implements an asynchronous datagram socket. Applications
034: * create an AUdpSocket and associate a SinkIF with it. Packets received
035: * on the socket will be pushed onto the SinkIF as AUdpInPacket objects.
036: * The AUdpSocket can also be used to send messages to the socket, and to
037: * associate a default send address using the connect() method.
038: *
039: * @author Matt Welsh
040: * @see AUdpInPacket
041: */
042: public class AUdpSocket extends SimpleSink {
043:
044: /** The default maximum packet size read by the socket. */
045: public static final int DEFAULT_MAX_PACKETSIZE = 16384;
046:
047: public int maxPacketSize, writeClogThreshold;
048: public SinkIF compQ;
049: InetAddress localaddress, remaddress;
050: int localport, remport;
051:
052: private boolean readerstarted = false;
053: private boolean closed = false;
054:
055: // Internal DatagramSockState associated with this connection
056: DatagramSockState sockState;
057:
058: /**
059: * Create a socket bound to any available local port. This is mainly
060: * used to create outgoing-only sockets.
061: */
062: public AUdpSocket(SinkIF compQ) throws IOException {
063: this (null, 0, compQ, DEFAULT_MAX_PACKETSIZE, -1);
064: }
065:
066: /**
067: * Create a socket bound to the given local port.
068: */
069: public AUdpSocket(int localport, SinkIF compQ) throws IOException {
070: this (null, localport, compQ, DEFAULT_MAX_PACKETSIZE, -1);
071: }
072:
073: /**
074: * Create a socket bound to the given local address and local port.
075: *
076: * @param maxPacketSize The maximum size, in bytes, of packets that
077: * this socket will attempt to receive. The default is
078: * DEFAULT_MAX_PACKETSIZE, which is 16 KBytes.
079: *
080: * @param writeClogThreshold The maximum number of outstanding writes
081: * on this socket before a SinkCloggedEvent is pushed to the
082: * connection's completion queue. This is effectively the maximum depth
083: * threshold for this connection's SinkIF. The default value is -1, which
084: * indicates that no SinkCloggedEvents will be generated.
085: *
086: */
087: public AUdpSocket(InetAddress localaddr, int localport,
088: SinkIF compQ, int maxPacketSize, int writeClogThreshold)
089: throws IOException {
090: this .remaddress = null;
091: this .remport = -1;
092: this .maxPacketSize = maxPacketSize;
093: this .writeClogThreshold = writeClogThreshold;
094: this .compQ = compQ;
095:
096: // Needed for getFactory() to work. Can't just call init() from
097: // getFactory() as initializing aSocketMgr requires a recursive
098: // call.
099: aSocketMgr.init();
100: this .sockState = aSocketMgr.getFactory().newDatagramSockState(
101: this , localaddr, localport);
102: }
103:
104: /**
105: * Associate a SinkIF with this socket and allow data
106: * to start flowing into it. When data is read, AUdpInPacket objects
107: * will be pushed into the given SinkIF. If this queue is full,
108: * the socket will attempt to allow packets to queue up in the O/S
109: * network stack (i.e. by not issuing further read calls on the
110: * socket). Until this method is called, no data will be read from
111: * the socket.
112: */
113: public void startReader(SinkIF receiveQ) {
114: startReader(receiveQ, -1);
115: }
116:
117: /**
118: * Associate a SinkIF with this socket and allow data
119: * to start flowing into it. When data is read, AUdpInPacket objects
120: * will be pushed into the given SinkIF. If this queue is full,
121: * the socket will attempt to allow packets to queue up in the O/S
122: * network stack (i.e. by not issuing further read calls on the
123: * socket). Until this method is called, no data will be read from
124: * the socket.
125: *
126: * @param readClogTries The number of times the aSocket layer will
127: * attempt to push an incoming packet onto the given SinkIF while the
128: * SinkIF is full. The queue entry will be dropped after this many
129: * tries. The default value is -1, which indicates that the aSocket
130: * layer will attempt to push the queue entry indefinitely.
131: */
132: public void startReader(SinkIF receiveQ, int readClogTries) {
133: if (readerstarted)
134: throw new IllegalArgumentException(
135: "startReader already called on this socket");
136: aSocketMgr.enqueueRequest(new AUdpStartReadRequest(this ,
137: receiveQ, readClogTries));
138: readerstarted = true;
139: }
140:
141: /**
142: * Enqueue an outgoing packet to be written to this socket.
143: * The packet must be of type BufferElement or AUdpPacket.
144: */
145: public void enqueue(QueueElementIF packet) throws SinkException {
146: if (closed)
147: throw new SinkClosedException("AUdpSocket closed");
148: if (packet == null)
149: throw new BadQueueElementException(
150: "AUdpSocket.enqueue got null element", packet);
151: aSocketMgr.enqueueRequest(new AUdpWriteRequest(this ,
152: (BufferElement) packet));
153: }
154:
155: /**
156: * Enqueue an outgoing packet to be written to this socket.
157: * The packet must be of type BufferElement or AUdpPacket. Drops the packet
158: * if it cannot be enqueued.
159: */
160: public boolean enqueue_lossy(QueueElementIF packet) {
161: if (closed)
162: return false;
163: if (packet == null)
164: return false;
165: aSocketMgr.enqueueRequest(new AUdpWriteRequest(this ,
166: (BufferElement) packet));
167: return true;
168: }
169:
170: /**
171: * Enqueue an set of outgoing packets to this socket.
172: * Each packet must be of type BufferElement or AUdpPacket.
173: */
174: public void enqueue_many(QueueElementIF packets[])
175: throws SinkException {
176: if (closed)
177: throw new SinkClosedException("AUdpSocket closed");
178: for (int i = 0; i < packets.length; i++) {
179: if (packets[i] == null)
180: throw new BadQueueElementException(
181: "AUdpSocket.enqueue_many got null element",
182: packets[i]);
183: aSocketMgr.enqueueRequest(new AUdpWriteRequest(this ,
184: (BufferElement) packets[i]));
185: }
186: }
187:
188: /**
189: * Close the socket. A SinkClosedEvent will be posted on the given
190: * compQ when the close is complete.
191: */
192: public void close(SinkIF compQ) throws SinkClosedException {
193: if (closed)
194: throw new SinkClosedException("AUdpSocket closed");
195: closed = true;
196: aSocketMgr.enqueueRequest(new AUdpCloseRequest(this , compQ));
197: }
198:
199: /**
200: * Flush the socket. A SinkFlushedEvent will be posted on the given
201: * compQ when the close is complete.
202: */
203: public void flush(SinkIF compQ) throws SinkClosedException {
204: if (closed)
205: throw new SinkClosedException("AUdpSocket closed");
206: aSocketMgr.enqueueRequest(new AUdpFlushRequest(this , compQ));
207: }
208:
209: /**
210: * Returns the number of elements currently waiting in the sink.
211: */
212: public int size() {
213: if (sockState == null)
214: return 0;
215: if (sockState.writeReqList == null)
216: return 0; // If closed
217: else
218: return sockState.writeReqList.size();
219: }
220:
221: /**
222: * Returns the next sequence number for packets arriving on this
223: * socket. Returns 0 if this socket is not active.
224: * Note that this method may return an <b>inaccurate</b> sequence
225: * number since the call is not synchronized with new message
226: * arrivals that may increment the sequence number.
227: */
228: public long getSequenceNumber() {
229: if (sockState == null)
230: return 0;
231: return sockState.seqNum;
232: }
233:
234: /**
235: * Returns the profile size of this connection.
236: */
237: public int profileSize() {
238: return size();
239: }
240:
241: /**
242: * Asynchronously connect this socket to the given port. All send
243: * requests enqueued after this given connect call will use the
244: * given address and port as the default address. An AUdpConnectEvent
245: * will be pushed to the user when the connect has completed.
246: */
247: public void connect(InetAddress addr, int port) {
248: aSocketMgr.enqueueRequest(new AUdpConnectRequest(this , addr,
249: port));
250: }
251:
252: /**
253: * Asynchronously disconnect this socket from the given port.
254: * An AUdpDisconnectEvent will be enqueued to the user when the
255: * disconnect has completed. If this socket is not connected
256: * then an AUdpDisconnectEvent will be pushed to the user regardless.
257: */
258: public void disconnect() {
259: aSocketMgr.enqueueRequest(new AUdpDisconnectRequest(this ));
260: }
261:
262: /**
263: * Return the InetAddress that this socket is connected to; returns
264: * null if not connected.
265: */
266: public InetAddress getAddress() {
267: return sockState.getSocket().getInetAddress();
268: }
269:
270: /**
271: * Return the port that this socket is connected to; returns -1 if
272: * not connected.
273: */
274: public int getPort() {
275: return sockState.getSocket().getPort();
276: }
277:
278: /**
279: * Return the local InetAddress for this socket.
280: */
281: public InetAddress getLocalAddress() {
282: return sockState.getSocket().getLocalAddress();
283: }
284:
285: /**
286: * Return the local port for this socket.
287: */
288: public int getLocalPort() {
289: return sockState.getSocket().getLocalPort();
290: }
291:
292: public DatagramSocket getSocket() {
293: return sockState.getSocket();
294: }
295:
296: }
|