001: /**
002: * Copyright (C) 2003 Manfred Andres
003: *
004: * This program is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU General Public License
006: * as published by the Free Software Foundation; either version 2
007: * of the License, or (at your option) any later version.
008: *
009: * This program is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
012: * GNU General Public License for more details.
013: *
014: * You should have received a copy of the GNU General Public License
015: * along with this program; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
017: */package freecs.core;
018:
019: import freecs.*;
020: import freecs.content.*;
021: import freecs.interfaces.*;
022: import freecs.util.ObjectBuffer;
023: import java.nio.ByteBuffer;
024: import java.nio.channels.SelectionKey;
025: import java.nio.channels.SocketChannel;
026: import freecs.util.TrafficMonitor;
027: import java.net.InetAddress;
028:
029: /**
030: * gets attached to the keys reading from a nonblocking channel
031: * stores the raw request in a buffer. if the request is finished, parse gets
032: * called which in turn decides which requestobject to use for this requst
033: * and suplies this RequestObject to the next available RequestEvaluator
034: */
035: public class ConnectionBuffer {
036: private volatile User u;
037: private int src;
038: private ByteBuffer buf;
039: private ByteBuffer tBuf = null;
040: public ByteBuffer rBuf = ByteBuffer
041: .allocate(Server.srv.READBUFFER_SIZE);
042: private SelectionKey sk;
043: private String ts;
044: private ObjectBuffer writeBuffer = new ObjectBuffer(
045: Server.srv.INITIAL_RESPONSE_QUEUE);
046: private volatile boolean valid = true;
047: public Connection conn;
048:
049: private StringBuffer lsb = new StringBuffer();
050:
051: private static final int GET = 1;
052: private static final int POST = 2;
053:
054: private int reqType = 0;
055: private int so = 0;
056: private int cStart = -1;
057: private int cLength = -1;
058: public volatile IRequest currentRequest;
059: private boolean reading = false;
060:
061: private volatile long closeWhen = System.currentTimeMillis()
062: + Server.srv.KEEP_ALIVE_TIMEOUT;
063:
064: public ConnectionBuffer(int src) {
065: this .src = src;
066: buf = ByteBuffer.allocate(Server.srv.READBUFFER_SIZE);
067: if (Server.TRACE_CREATE_AND_FINALIZE)
068: Server.log(this ,
069: "++++++++++++++++++++++++++++++++++++++++CREATE",
070: Server.MSG_STATE, Server.LVL_VERY_VERBOSE);
071: }
072:
073: /**
074: * appends to the incomplete request and checks if it has completed
075: * if the request is complete, it will be returned. NULL will be returned
076: * on the other hand.
077: * FIXME: has to get more modular to support different protocols
078: * @return IRequst The full request || null if incomplete
079: */
080: public IRequest append() throws Exception {
081: boolean parse = false;
082: synchronized (this ) {
083: reading = true;
084: rBuf.flip();
085: if (this .buf.remaining() < rBuf.remaining()) {
086: ByteBuffer tbuf = ByteBuffer.allocate(this .buf
087: .position()
088: + rBuf.remaining());
089: this .buf.flip();
090: tbuf.put(this .buf);
091: this .buf = tbuf;
092: }
093: this .buf.put(rBuf);
094: rBuf.clear();
095: if (reqType == 0 && this .buf.position() > 4) {
096: if (this .buf.get(0) == 'P' && this .buf.get(1) == 'O'
097: && this .buf.get(2) == 'S'
098: && this .buf.get(3) == 'T') {
099: reqType = POST;
100: } else if (this .buf.get(0) == 'G'
101: && this .buf.get(1) == 'E'
102: && this .buf.get(2) == 'T') {
103: reqType = GET;
104: } else {
105: this .addLog("HEADER-INVALID");
106: this .invalidate();
107: reading = false;
108: return null;
109: }
110: }
111: if (reqType == GET) {
112: if (this .buf.position() > 4096) {
113: this .addLog("HEADER>4096bytes");
114: this .invalidate();
115: reading = false;
116: return null;
117: }
118: if (this .buf.position() > 10
119: && this .buf.get(this .buf.position() - 4) == '\r'
120: && this .buf.get(this .buf.position() - 3) == '\n'
121: && this .buf.get(this .buf.position() - 2) == '\r'
122: && this .buf.get(this .buf.position() - 1) == '\n') {
123: parse = true;
124: }
125: } else if (reqType == POST) {
126: if (cLength == -1) {
127: for (; so < this .buf.position() - 15; so++) {
128: if (so > 4096
129: || (this .buf.get(so) == '\r'
130: && this .buf.get(so + 1) == '\n'
131: && this .buf.get(so + 2) == '\r' && this .buf
132: .get(so + 3) == '\n')) {
133: this .addLog("HEADER-INVALID");
134: this .invalidate();
135: reading = false;
136: return null;
137: }
138: if (this .buf.get(so) == 'C'
139: && this .buf.get(so + 1) == 'o'
140: && this .buf.get(so + 2) == 'n'
141: && this .buf.get(so + 3) == 't'
142: && this .buf.get(so + 4) == 'e'
143: && this .buf.get(so + 5) == 'n'
144: && this .buf.get(so + 6) == 't'
145: && this .buf.get(so + 7) == '-'
146: && (this .buf.get(so + 8) == 'L' || this .buf
147: .get(so + 8) == 'l')
148: && this .buf.get(so + 9) == 'e'
149: && this .buf.get(so + 10) == 'n'
150: && this .buf.get(so + 11) == 'g'
151: && this .buf.get(so + 12) == 't'
152: && this .buf.get(so + 13) == 'h'
153: && this .buf.get(so + 14) == ':') {
154: int cso = so + 14;
155: if (cso >= this .buf.capacity())
156: return null;
157: while ((this .buf.get(cso) < 48 || this .buf
158: .get(cso) > 57)) {
159: if (cso >= this .buf.capacity())
160: return null;
161: cso++;
162: }
163: StringBuffer sb = new StringBuffer();
164: while (this .buf.get(cso) >= 48
165: && this .buf.get(cso) <= 57) {
166: if (cso >= this .buf.capacity())
167: return null;
168: sb.append((char) this .buf.get(cso));
169: cso++;
170: }
171: so = cso;
172: cLength = Integer.parseInt(sb.toString());
173: break;
174: }
175: }
176: }
177: if (cLength != -1) {
178: for (; cStart == -1 && so < this .buf.position() - 4; so++) {
179: if (so > 4096) {
180: this .addLog("HEADER>4096bytes");
181: this .invalidate();
182: reading = false;
183: return null;
184: }
185: if (this .buf.get(so) == '\r'
186: && this .buf.get(so + 1) == '\n'
187: && this .buf.get(so + 2) == '\r'
188: && this .buf.get(so + 3) == '\n') {
189: cStart = so + 4;
190: break;
191: }
192: }
193: if (cStart != -1) {
194: if ((this .buf.position() - cStart) > cLength) {
195: int diff = this .buf.position() - cStart
196: - cLength;
197: tBuf = ByteBuffer.allocate(diff);
198: for (int pos = this .buf.position() - diff; pos < this .buf
199: .position(); pos++) {
200: tBuf.put(this .buf.get(pos));
201: }
202: this .buf.position(cStart + cLength);
203: parse = true;
204: } else if ((this .buf.position() - cStart) == cLength) {
205: parse = true;
206: }
207: }
208: }
209: }
210: }
211: if (parse)
212: return parse();
213: return null;
214: }
215:
216: /**
217: * hands over this buffer to the requestparser-threads which take care of parsing the request
218: * @return IRequest The IRequest-object containing the request
219: */
220: public IRequest parse() throws Exception {
221: // FIXME: when we install another protocol we have to check here for the type of protocol
222: IRequest req = null;
223: synchronized (this ) {
224: this .buf.flip();
225: try {
226: req = new HTTPRequest(buf, this );
227: } catch (Exception e) {
228: reset();
229: throw e;
230: }
231: reading = false;
232: }
233: try {
234: req.parse();
235: Connection conn = req.getConnectionObject();
236: if (!conn.isDirectlyConnected) {
237: InetAddress ia = ((SocketChannel) sk.channel())
238: .socket().getInetAddress();
239: if (ia != null) {
240: TrafficMonitor.tm.markAsProxy(ia);
241: }
242: }
243: /* } catch (Exception e) {
244: Server.debug (this, "parse: ", e, Server.MSG_ERROR, Server.LVL_MAJOR);
245: throw e; */
246: } finally {
247: reset();
248: }
249: return req;
250: }
251:
252: private synchronized void reset() {
253: if (buf.capacity() != Server.srv.READBUFFER_SIZE) {
254: buf = ByteBuffer.allocate(Server.srv.READBUFFER_SIZE);
255: } else {
256: buf.clear();
257: }
258: if (tBuf != null) {
259: buf.put(tBuf);
260: tBuf = null;
261: }
262: cStart = -1;
263: cLength = -1;
264: reqType = 0;
265: so = 0;
266: valid = true;
267: reading = false;
268: }
269:
270: public void setTemplateSet(String ts) {
271: this .ts = ts;
272: }
273:
274: public String getTemplateSet() {
275: return ts;
276: }
277:
278: public void setUser(User u) {
279: this .u = u;
280: }
281:
282: public User getUser() {
283: return u;
284: }
285:
286: /**
287: * returns the SocketChannel of this requestbuffer
288: */
289: public SelectionKey getKey() {
290: return sk;
291: }
292:
293: public void setKey(SelectionKey sk) {
294: if (!CentralSelector.isSkValid(sk)) {
295: Server.log(this , "setKey: tryed to set invalid key",
296: Server.MSG_STATE, Server.LVL_VERBOSE);
297: return;
298: }
299: this .sk = sk;
300: conn = new Connection(sk);
301: }
302:
303: public void addToWrite(Object ic) {
304: if (!CentralSelector.isSkValid(sk)) {
305: Server.log(this ,
306: "addToWrite: selection-key isn't valid anymore",
307: Server.MSG_STATE, Server.LVL_VERBOSE);
308: return;
309: }
310: synchronized (this ) {
311: if (writeBuffer.isFull()) {
312: int newSize = writeBuffer.capacity()
313: + Server.srv.INITIAL_RESPONSE_QUEUE;
314: if (newSize > Server.srv.MAX_RESPONSE_QUEUE) {
315: Server.log(this ,
316: "addToWrite: write-queue would be bigger than specified for "
317: + toString(), Server.MSG_STATE,
318: Server.LVL_MINOR);
319: return;
320: }
321: Server.log(this ,
322: "addToWrite: Expanding write-queue for "
323: + toString(), Server.MSG_STATE,
324: Server.LVL_MINOR);
325: writeBuffer.resizeTo(newSize);
326: }
327: writeBuffer.put(ic);
328: }
329: writeToLog();
330: Responder.res.addToWrite((SocketChannel) sk.channel(), this );
331: }
332:
333: public ObjectBuffer getWriteQueue() {
334: return writeBuffer;
335: }
336:
337: public void updateKeepAliveTimeout() {
338: if (isMessageFrame)
339: return;
340: closeWhen = System.currentTimeMillis()
341: + Server.srv.KEEP_ALIVE_TIMEOUT;
342: }
343:
344: public long getKeepAliveTimeout(long ts) {
345: if (isMessageFrame || reading)
346: return -1;
347: return closeWhen;
348: }
349:
350: public void invalidate() {
351: valid = false;
352: }
353:
354: public boolean isValid() {
355: return valid;
356: }
357:
358: private volatile boolean isMessageFrame = false;
359:
360: public void setIsMessageFrame(boolean b) {
361: // Server.log("changed state to message-frame-state", Server.MSG_STATE, Server.LVL_MAJOR);
362: isMessageFrame = b;
363: }
364:
365: public void addLog(String str) {
366: lsb.append(" ");
367: lsb.append(str);
368: }
369:
370: public void writeToLog() {
371: if (lsb.length() < 1)
372: return;
373: if (conn != null && conn.peerAddress != null)
374: lsb.insert(0, conn.peerAddress.getHostAddress());
375: else if (conn != null)
376: lsb.insert(0, conn.toString());
377: else
378: lsb.insert(0, "undefined");
379: Server.log("OK", lsb.toString(), Server.MSG_TRAFFIC,
380: Server.LVL_MINOR);
381: lsb = new StringBuffer();
382: }
383:
384: public void logError(String reason) {
385: lsb.append(" REASON: ");
386: lsb.append(reason);
387: if (conn != null && conn.peerAddress != null)
388: lsb.insert(0, conn.peerAddress.getHostAddress());
389: else if (conn != null)
390: lsb.insert(0, conn.toString());
391: else
392: lsb.insert(0, "undefined");
393: Server.log("FAILED", lsb.toString(), Server.MSG_TRAFFIC,
394: Server.LVL_MAJOR);
395: lsb = new StringBuffer();
396: }
397:
398: public void finalize() {
399: if (Server.TRACE_CREATE_AND_FINALIZE)
400: Server
401: .log(
402: this ,
403: "----------------------------------------FINALIZED",
404: Server.MSG_STATE, Server.LVL_VERY_VERBOSE);
405: }
406: }
|