001: /*
002: * GNetWatch
003: * Copyright 2006, 2007 Alexandre Fenyo
004: * gnetwatch@fenyo.net
005: *
006: * This file is part of GNetWatch.
007: *
008: * GNetWatch is free software; you can redistribute it and/or modify
009: * it under the terms of the GNU General Public License as published by
010: * the Free Software Foundation; either version 2 of the License, or
011: * (at your option) any later version.
012: *
013: * GNetWatch is distributed in the hope that it will be useful,
014: * but WITHOUT ANY WARRANTY; without even the implied warranty of
015: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
016: * GNU General Public License for more details.
017: *
018: * You should have received a copy of the GNU General Public License
019: * along with GNetWatch; if not, write to the Free Software
020: * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
021: */
022:
023: package net.fenyo.gnetwatch.actions;
024:
025: import net.fenyo.gnetwatch.*;
026: import net.fenyo.gnetwatch.actions.Action.InterruptCause;
027: import net.fenyo.gnetwatch.activities.Background;
028: import net.fenyo.gnetwatch.data.*;
029: import net.fenyo.gnetwatch.targets.*;
030:
031: import java.io.*;
032: import java.net.*;
033: import javax.net.ssl.*;
034: import java.util.Arrays;
035: import java.util.Date;
036:
037: import org.apache.commons.logging.Log;
038: import org.apache.commons.logging.LogFactory;
039:
040: /**
041: * Instances of this action class can load any HTTP server
042: * and create events of type EventHTTP to log the throughput.
043: * @author Alexandre Fenyo
044: * @version $Id: ActionHTTP.java,v 1.8 2007/03/12 05:04:14 fenyo Exp $
045: */
046:
047: public class ActionHTTP extends Action {
048: private static Log log = LogFactory.getLog(ActionHTTP.class);
049:
050: private boolean interrupted = false;
051:
052: private String error_string = "";
053:
054: /**
055: * Constructor.
056: * @param target target this action works on.
057: * @param background queue manager by which this action will add events.
058: */
059: // GUI thread
060: // supports any thread
061: public ActionHTTP(final Target target, final Background background) {
062: super (target, background);
063: setItem("http");
064: }
065:
066: /**
067: * Constructor.
068: * @param none.
069: */
070: // GUI thread
071: // supports any thread
072: public ActionHTTP() {
073: setItem("http");
074: }
075:
076: /**
077: * Returns the associated target.
078: * @param none.
079: * @return Target associated target.
080: */
081: // any thread
082: public String getQueueName() {
083: return "http";
084: }
085:
086: /**
087: * Returns the timeout associated with this action.
088: * @param none.
089: * @return long timeout.
090: */
091: // any thread
092: // bug : au bout de ce tps en ms ca s'arrete
093: public long getMaxDelay() {
094: return 30000000;
095: }
096:
097: /**
098: * Asks this action to stop rapidely.
099: * @param cause cause.
100: * @return void.
101: * @throws IOException IO exception.
102: */
103: // main & Background threads
104: // supports any thread
105: public void interrupt(final InterruptCause reason) {
106: interrupted = true;
107: }
108:
109: /**
110: * Establishes the connections to the server.
111: * @param idx number of connections to establish.
112: * @param querier http/ftp parameters.
113: * @param connections array of connections established.
114: * @param streams streams associated to the connections.
115: * @param sizes data sizes ready to be read on the connections.
116: * @param url url to connect to.
117: * @param proxy proxy to use.
118: * @return number of bytes received.
119: * @throws IOException IO exception.
120: */
121: private int connect(final int idx, final IPQuerier querier,
122: final URLConnection[] connections,
123: final InputStream[] streams, final int[] sizes,
124: final URL url, final Proxy proxy) throws IOException {
125: error_string = "";
126: try {
127: connections[idx] = querier.getUseProxy() ? url
128: .openConnection(proxy) : url.openConnection();
129: connections[idx].setUseCaches(false);
130: connections[idx].connect();
131: streams[idx] = connections[idx].getInputStream();
132: sizes[idx] = connections[idx].getContentLength();
133:
134: } catch (final IOException ex) {
135:
136: streams[idx] = null;
137: sizes[idx] = 0;
138:
139: int response_code = 0;
140: try {
141: response_code = ((HttpURLConnection) connections[idx])
142: .getResponseCode();
143: } catch (final ConnectException ex2) {
144: getGUI().appendConsole(ex2.toString() + "<BR/>");
145: try {
146: Thread.sleep(1000);
147: } catch (final InterruptedException ex3) {
148: }
149:
150: throw ex2;
151: }
152:
153: error_string = "(http error " + response_code + ")";
154: final InputStream error_stream = ((HttpURLConnection) connections[idx])
155: .getErrorStream();
156: if (error_stream == null)
157: return 0;
158: int nread, nread_tot = 0;
159: String error_str = "";
160: final byte[] error_buf = new byte[65536];
161: while ((nread = error_stream.read(error_buf)) > 0) {
162: // log.debug("error: " + new String(error_buf).substring(0, nread - 1));
163: error_str += new String(error_buf);
164: nread_tot += nread;
165: }
166: error_stream.close();
167: return nread_tot;
168: }
169: return 0;
170: }
171:
172: /**
173: * Loads the server.
174: * @param none.
175: * @return void.
176: * @throws IOException IO exception.
177: * @throws InterruptedException exception.
178: * @see http://java.sun.com/j2se/1.5.0/docs/guide/net/http-keepalive.html
179: */
180: // Queue thread
181: // supports any thread
182: public void invoke() throws IOException, InterruptedException {
183: if (isDisposed() == true)
184: return;
185:
186: try {
187: super .invoke();
188:
189: final IPQuerier querier;
190: if (TargetIPv4.class.isInstance(getTarget())) {
191: querier = ((TargetIPv4) getTarget()).getIPQuerier();
192: } else if (TargetIPv6.class.isInstance(getTarget())) {
193: querier = ((TargetIPv6) getTarget()).getIPQuerier();
194: } else
195: return;
196:
197: final URL url = new URL(querier.getURL());
198: final Proxy proxy = querier.getUseProxy() ? new Proxy(
199: Proxy.Type.HTTP, new InetSocketAddress(querier
200: .getProxyHost(), querier.getProxyPort()))
201: : null;
202:
203: URLConnection[] connections = new URLConnection[querier
204: .getNParallel()];
205: InputStream[] streams = new InputStream[querier
206: .getNParallel()];
207: int[] sizes = new int[querier.getNParallel()];
208:
209: for (int idx = 0; idx < querier.getNParallel(); idx++)
210: connect(idx, querier, connections, streams, sizes, url,
211: proxy);
212:
213: final byte[] buf = new byte[65536];
214: long last_time = System.currentTimeMillis();
215: int bytes_received = 0;
216: int pages_received = 0;
217:
218: while (true) {
219: int available_for_every_connections = 0;
220:
221: for (int idx = 0; idx < querier.getNParallel(); idx++) {
222: final int available = (streams[idx] != null) ? streams[idx]
223: .available()
224: : 0;
225: available_for_every_connections += available;
226:
227: if (available == 0) {
228: if (sizes[idx] == 0) {
229: if (streams[idx] != null)
230: streams[idx].close();
231: bytes_received += connect(idx, querier,
232: connections, streams, sizes, url,
233: proxy);
234: pages_received++;
235: }
236: } else {
237: final int nread = streams[idx].read(buf);
238: switch (nread) {
239: case -1:
240: streams[idx].close();
241: connect(idx, querier, connections, streams,
242: sizes, url, proxy);
243: pages_received++;
244: break;
245:
246: case 0:
247: log.error("0 byte read");
248: for (InputStream foo : streams)
249: if (foo != null)
250: foo.close();
251: return;
252:
253: default:
254: // log.debug("read: " + new String(buf).substring(0, nread - 1));
255: bytes_received += nread;
256: sizes[idx] -= nread;
257: }
258: }
259:
260: if (System.currentTimeMillis() - last_time > 1000) {
261: getTarget().addEvent(
262: new EventHTTP(bytes_received));
263: getTarget().addEvent(
264: new EventHTTPPages(pages_received));
265:
266: setDescription(""
267: + new Double(
268: ((double) 8 * 1000 * bytes_received)
269: / (System
270: .currentTimeMillis() - last_time))
271: .intValue()
272: + " bit/s ("
273: + new Double(
274: ((double) 1000 * pages_received)
275: / (System
276: .currentTimeMillis() - last_time))
277: .intValue() + " pages/sec)");
278: getGUI().setStatus(
279: getGUI().getConfig().getPattern(
280: "bytes_http",
281: bytes_received,
282: querier.getAddress().toString()
283: .substring(1))
284: + " " + error_string);
285:
286: last_time = System.currentTimeMillis();
287: bytes_received = 0;
288: pages_received = 0;
289: }
290:
291: if (interrupted == true) {
292: for (InputStream foo : streams)
293: if (foo != null)
294: foo.close();
295: return;
296: }
297: }
298: if (available_for_every_connections == 0)
299: Thread.sleep(10);
300: }
301: } catch (final InterruptedException ex) {
302: log.error("Exception", ex);
303: }
304: }
305:
306: /**
307: * Called when this element is being removed.
308: * @param none.
309: * @return void.
310: */
311: protected void disposed() {
312: // remove us from the flood queue
313: super .disposed();
314:
315: // interrupt if currently running
316: interrupt(InterruptCause.removed);
317: }
318: }
|