001: /*
002: * Copyright (c) 2001 by Matt Welsh and The Regents of the University of
003: * California. All rights reserved.
004: *
005: * Permission to use, copy, modify, and distribute this software and its
006: * documentation for any purpose, without fee, and without written agreement is
007: * hereby granted, provided that the above copyright notice and the following
008: * two paragraphs appear in all copies of this software.
009: *
010: * IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR
011: * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT
012: * OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF
013: * CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
014: *
015: * THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES,
016: * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
017: * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
018: * ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO
019: * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
020: *
021: * Author: Matt Welsh <mdw@cs.berkeley.edu>
022: *
023: */
024:
025: package seda.apps.Haboob.http;
026:
027: import seda.sandStorm.api.*;
028: import seda.sandStorm.core.*;
029: import seda.sandStorm.lib.http.*;
030: import seda.apps.Haboob.*;
031: import java.util.Hashtable;
032:
033: /**
034: * This stage simply forwards httpResponders to the appropriate
035: * httpConnection. Note that when ENQUEUE_RESPONSES is false (the
036: * default), then this stage is not actually used -- rather, the
037: * sendResponse() call directly implements forwarding.
038: */
039: public class HttpSend implements EventHandlerIF, HaboobConst {
040:
041: private static final boolean DEBUG = false;
042:
043: // If true, handle sends from this stage - otherwise inline into
044: // sendResponse() call
045: private static final boolean ENQUEUE_RESPONSES = false;
046:
047: private static SinkIF mysink, cacheSink;
048: private static Hashtable respTable;
049: private static int maxReqs;
050:
051: private int ServerPort;
052:
053: public HttpSend() {
054:
055: }
056:
057: public void init(ConfigDataIF config) throws Exception {
058: mysink = config.getStage().getSink();
059: cacheSink = config.getManager().getStage(CACHE_STAGE).getSink();
060: maxReqs = config.getInt("maxRequests");
061: if (maxReqs != -1)
062: respTable = new Hashtable();
063:
064: ServerPort = config.getInt("ServerPort");
065: if (ServerPort == -1) {
066: throw new IllegalArgumentException(
067: "Must specify ServerPort");
068: } else {
069: HaboobStats.addSender("" + ServerPort, this );
070: }
071: }
072:
073: public void destroy() {
074: }
075:
076: // Library call used to send HTTP response
077: public static void sendResponse(httpResponder resp) {
078: if (ENQUEUE_RESPONSES) {
079: mysink.enqueue_lossy(resp);
080: } else {
081: // Inline the call
082: doResponse(resp);
083: }
084: }
085:
086: private static final void doResponse(httpResponder resp) {
087: if (DEBUG)
088: System.err.println("HttpSend: Got response " + resp);
089:
090: if (resp.getResponse().getServerPort() > 0)
091: HaboobStats.getReceiver(
092: "" + resp.getResponse().getServerPort())
093: .doneWithReq();
094:
095: if (!resp.getConnection().enqueue_lossy(resp)) {
096: // This is OK if we have already closed the connection
097: if (DEBUG)
098: System.err
099: .println("HttpSend: Could not enqueue response "
100: + resp.getResponse()
101: + " to connection "
102: + resp.getConnection());
103: return;
104: }
105:
106: if (resp.shouldClose()) {
107: HaboobStats.numConnectionsClosed++;
108: httpConnection conn = resp.getConnection();
109: try {
110: conn.close(cacheSink);
111: if (maxReqs != -1)
112: respTable.remove(conn);
113: } catch (SinkClosedException sce) {
114: if (DEBUG)
115: System.err
116: .println("Warning: Tried to close connection "
117: + conn + " multiple times");
118: }
119: return;
120: }
121:
122: if (maxReqs != -1) {
123: httpConnection conn = resp.getConnection();
124: Integer count = (Integer) respTable.remove(conn);
125: if (count == null) {
126: respTable.put(conn, new Integer(0));
127: } else {
128: int prevConns = HaboobStats.numConnectionsEstablished
129: - HaboobStats.numConnectionsClosed;
130: int c = count.intValue();
131: c++;
132: if (c >= maxReqs) {
133: HaboobStats.getReceiver(
134: "" + resp.getResponse().getServerPort())
135: .closeConnection(conn);
136: } else {
137: respTable.put(conn, new Integer(c));
138: }
139: }
140: }
141: }
142:
143: public void handleEvent(QueueElementIF item) {
144: if (DEBUG)
145: System.err.println("HttpSend: GOT QEL: " + item);
146:
147: if (item instanceof httpResponder) {
148: doResponse((httpResponder) item);
149:
150: } else if (item instanceof SinkClosedEvent) {
151: // Connection closed by remote peer
152: SinkClosedEvent sce = (SinkClosedEvent) item;
153: httpConnection hc = (httpConnection) sce.sink;
154: if (maxReqs != -1)
155: respTable.remove(hc);
156:
157: } else {
158: System.err.println("HttpSend: Got unknown event type: "
159: + item);
160: }
161:
162: }
163:
164: public void handleEvents(QueueElementIF items[]) {
165: for (int i = 0; i < items.length; i++) {
166: handleEvent(items[i]);
167: }
168: }
169:
170: /**
171: * Inner class representing the number of requests on a given
172: * connection.
173: */
174: class requestCount {
175: int reqs;
176: }
177:
178: }
|