001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.sandStorm.lib.aSocket;
026:
027: import java.util.*;
028: import java.io.*;
029: import java.net.*;
030: import seda.sandStorm.lib.util.MultiByteArrayInputStream;
031:
032: /**
033: * This is a utility class that allows you to push multiple ATcpInPackets
034: * in, and read bytes out as a stream. This is meant to be a convenience
035: * for performing packet processing using the aSocket interfaces.
036: * This class also takes care of reordering packets according to the
037: * ATcpInPacket sequence number; that is, if multiple threads in a stage
038: * are receiving ATcpInPackets for the same connection, the aSocketInputStream
039: * will internally reorder those packets.
040: *
041: * @author Matt Welsh
042: * @see MultiByteArrayInputStream
043: */
044: public class aSocketInputStream extends MultiByteArrayInputStream {
045:
046: private static final boolean DEBUG = false;
047:
048: private TreeSet outoforder;
049: private long nextSeqNum;
050:
051: /**
052: * Create an aSocketInputStream with an initial sequence number of 1.
053: */
054: public aSocketInputStream() {
055: super ();
056: outoforder = new TreeSet(new seqNumComparator());
057: nextSeqNum = 1;
058: }
059:
060: /**
061: * Create an aSocketInputStream using the given initial sequence number.
062: */
063: public aSocketInputStream(long initialSeqNum) {
064: super ();
065: outoforder = new TreeSet(new seqNumComparator());
066: nextSeqNum = initialSeqNum;
067: }
068:
069: // Internal class used to reorder elements of 'outoforder' according
070: // to sequence number
071: class seqNumComparator implements Comparator {
072: public int compare(Object o1, Object o2)
073: throws ClassCastException {
074: ATcpInPacket p1 = (ATcpInPacket) o1;
075: ATcpInPacket p2 = (ATcpInPacket) o2;
076: long sn1 = p1.seqNum;
077: long sn2 = p2.seqNum;
078:
079: if (sn1 == sn2)
080: return 0;
081: else if (sn1 < sn2)
082: return -1;
083: else
084: return 1;
085: }
086: }
087:
088: /**
089: * Add a packet to this aSocketInputStream. Reorders packets internally
090: * so that bytes will be read from this InputStream according to the
091: * sequence number order of the packets.
092: */
093: public synchronized void addPacket(ATcpInPacket pkt) {
094: long sn = pkt.getSequenceNumber();
095: if (sn == 0) {
096: // No sequence number -- assume it's in order, but don't increment
097: // the nextSeqNum
098: addArray(pkt.getBytes());
099: } else if (sn == nextSeqNum) {
100: addArray(pkt.getBytes());
101: nextSeqNum++;
102: // seqNum of 0 is special
103: if (nextSeqNum == 0)
104: nextSeqNum = 1;
105: } else {
106: // Assume out of order. Don't treat (sn < nextSeqNum)
107: // differently than (sn > nextSeqNum), since we have
108: // wraparound.
109:
110: System.out
111: .println("aSocketInputStream : Out of order packet");
112: outoforder.add(pkt);
113:
114: // Push any 'ready' outoforder elements
115: try {
116: ATcpInPacket first = (ATcpInPacket) outoforder.first();
117: while (first != null && first.seqNum == nextSeqNum) {
118: outoforder.remove(first);
119: addArray(first.getBytes());
120: nextSeqNum++;
121: // seqNum of 0 is special
122: if (nextSeqNum == 0)
123: nextSeqNum = 1;
124: first = (ATcpInPacket) outoforder.first();
125: }
126: } catch (NoSuchElementException e) {
127: // Ignore
128: }
129: }
130: }
131:
132: /**
133: * Reinitialize the state of this input stream, clearing all
134: * internal data and pointers. The next sequence number will
135: * be preserved.
136: */
137: public synchronized void clear() {
138: super .clear();
139: outoforder = new TreeSet(new seqNumComparator());
140: }
141:
142: /**
143: * Return the next expected sequence number.
144: */
145: public synchronized long getNextSequenceNumber() {
146: return nextSeqNum;
147: }
148:
149: }
|