001: //========================================================================
002: //$Id: AbstractConnector.java,v 1.9 2005/11/14 11:00:31 gregwilkins Exp $
003: //Copyright 2004-2005 Mort Bay Consulting Pty. Ltd.
004: //------------------------------------------------------------------------
005: //Licensed under the Apache License, Version 2.0 (the "License");
006: //you may not use this file except in compliance with the License.
007: //You may obtain a copy of the License at
008: //http://www.apache.org/licenses/LICENSE-2.0
009: //Unless required by applicable law or agreed to in writing, software
010: //distributed under the License is distributed on an "AS IS" BASIS,
011: //WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
012: //See the License for the specific language governing permissions and
013: //limitations under the License.
014: //========================================================================
015:
016: package org.mortbay.jetty;
017:
018: import java.io.IOException;
019: import java.net.Socket;
020: import java.util.ArrayList;
021:
022: import org.mortbay.component.AbstractLifeCycle;
023: import org.mortbay.component.LifeCycle;
024: import org.mortbay.io.Buffer;
025: import org.mortbay.io.EndPoint;
026: import org.mortbay.log.Log;
027: import org.mortbay.thread.ThreadPool;
028: import org.mortbay.util.ajax.Continuation;
029: import org.mortbay.util.ajax.WaitingContinuation;
030:
031: /** Abstract Connector implementation.
032: * This abstract implemenation of the Connector interface provides:<ul>
033: * <li>AbstractLifeCycle implementation</li>
034: * <li>Implementations for connector getters and setters</li>
035: * <li>Buffer management</li>
036: * <li>Socket configuration</li>
037: * <li>Base acceptor thread</li>
038: * </ul>
039: *
040: * @author gregw
041: *
042: * TODO - allow multiple Acceptor threads
043: */
044: public abstract class AbstractConnector extends AbstractBuffers
045: implements Connector {
046: private String _name;
047: private Server _server;
048: private ThreadPool _threadPool;
049: private String _host;
050: private int _port = 0;
051: private String _integralScheme = HttpSchemes.HTTPS;
052: private int _integralPort = 0;
053: private String _confidentialScheme = HttpSchemes.HTTPS;
054: private int _confidentialPort = 0;
055: private int _acceptQueueSize = 0;
056: private int _acceptors = 1;
057: private boolean _useDNS;
058:
059: protected int _maxIdleTime = 30000;
060: protected int _lowResourceMaxIdleTime = -1;
061: protected int _soLingerTime = -1;
062:
063: private transient Thread[] _acceptorThread;
064:
065: Object _statsLock = new Object();
066: transient long _statsStartedAt = -1;
067: transient int _requests;
068: transient int _connections; // total number of connections made to server
069:
070: transient int _connectionsOpen; // number of connections currently open
071: transient int _connectionsOpenMin; // min number of connections open simultaneously
072: transient int _connectionsOpenMax; // max number of connections open simultaneously
073:
074: transient long _connectionsDurationMin; // min duration of a connection
075: transient long _connectionsDurationMax; // max duration of a connection
076: transient long _connectionsDurationTotal; // total duration of all coneection
077:
078: transient int _connectionsRequestsMin; // min requests per connection
079: transient int _connectionsRequestsMax; // max requests per connection
080:
081: /* ------------------------------------------------------------------------------- */
082: /**
083: */
084: public AbstractConnector() {
085: }
086:
087: /* ------------------------------------------------------------------------------- */
088: public abstract void open() throws IOException;
089:
090: /* ------------------------------------------------------------------------------- */
091: /*
092: */
093: public Server getServer() {
094: return _server;
095: }
096:
097: /* ------------------------------------------------------------------------------- */
098: public void setServer(Server server) {
099: _server = server;
100: }
101:
102: /* ------------------------------------------------------------------------------- */
103: /*
104: * @see org.mortbay.jetty.HttpListener#getHttpServer()
105: */
106: public ThreadPool getThreadPool() {
107: return _threadPool;
108: }
109:
110: /* ------------------------------------------------------------------------------- */
111: public void setThreadPool(ThreadPool pool) {
112: _threadPool = pool;
113: }
114:
115: /* ------------------------------------------------------------------------------- */
116: /**
117: */
118: public void setHost(String host) {
119: _host = host;
120: }
121:
122: /* ------------------------------------------------------------------------------- */
123: /*
124: */
125: public String getHost() {
126: return _host;
127: }
128:
129: /* ------------------------------------------------------------------------------- */
130: /*
131: * @see org.mortbay.jetty.HttpListener#setPort(int)
132: */
133: public void setPort(int port) {
134: _port = port;
135: }
136:
137: /* ------------------------------------------------------------------------------- */
138: /*
139: * @see org.mortbay.jetty.HttpListener#getPort()
140: */
141: public int getPort() {
142: return _port;
143: }
144:
145: /* ------------------------------------------------------------ */
146: /**
147: * @return Returns the maxIdleTime.
148: */
149: public int getMaxIdleTime() {
150: return _maxIdleTime;
151: }
152:
153: /* ------------------------------------------------------------ */
154: /**
155: * @param maxIdleTime The maxIdleTime to set.
156: */
157: public void setMaxIdleTime(int maxIdleTime) {
158: _maxIdleTime = maxIdleTime;
159: }
160:
161: /* ------------------------------------------------------------ */
162: /**
163: * @return Returns the maxIdleTime.
164: */
165: public int getLowResourceMaxIdleTime() {
166: return _lowResourceMaxIdleTime;
167: }
168:
169: /* ------------------------------------------------------------ */
170: /**
171: * @param maxIdleTime The maxIdleTime to set.
172: */
173: public void setLowResourceMaxIdleTime(int maxIdleTime) {
174: _lowResourceMaxIdleTime = maxIdleTime;
175: }
176:
177: /* ------------------------------------------------------------ */
178: /**
179: * @return Returns the soLingerTime.
180: */
181: public long getSoLingerTime() {
182: return _soLingerTime;
183: }
184:
185: /* ------------------------------------------------------------ */
186: /**
187: * @return Returns the acceptQueueSize.
188: */
189: public int getAcceptQueueSize() {
190: return _acceptQueueSize;
191: }
192:
193: /* ------------------------------------------------------------ */
194: /**
195: * @param acceptQueueSize The acceptQueueSize to set.
196: */
197: public void setAcceptQueueSize(int acceptQueueSize) {
198: _acceptQueueSize = acceptQueueSize;
199: }
200:
201: /* ------------------------------------------------------------ */
202: /**
203: * @return Returns the number of acceptor threads.
204: */
205: public int getAcceptors() {
206: return _acceptors;
207: }
208:
209: /* ------------------------------------------------------------ */
210: /**
211: * @param acceptors The number of acceptor threads to set.
212: */
213: public void setAcceptors(int acceptors) {
214: _acceptors = acceptors;
215: }
216:
217: /* ------------------------------------------------------------ */
218: /**
219: * @param soLingerTime The soLingerTime to set or -1 to disable.
220: */
221: public void setSoLingerTime(int soLingerTime) {
222: _soLingerTime = soLingerTime;
223: }
224:
225: /* ------------------------------------------------------------ */
226: protected void doStart() throws Exception {
227: // open listener port
228: open();
229:
230: super .doStart();
231:
232: if (_threadPool == null)
233: _threadPool = _server.getThreadPool();
234: if (_threadPool != _server.getThreadPool()
235: && (_threadPool instanceof LifeCycle))
236: ((LifeCycle) _threadPool).start();
237:
238: // Start selector thread
239: _acceptorThread = new Thread[getAcceptors()];
240: for (int i = 0; i < _acceptorThread.length; i++) {
241: if (!_threadPool.dispatch(new Acceptor(i))) {
242: Log.warn("insufficient maxThreads configured for {}",
243: this );
244: break;
245: }
246: }
247:
248: Log.info("Started {}", this );
249: }
250:
251: /* ------------------------------------------------------------ */
252: protected void doStop() throws Exception {
253: if (_threadPool == _server.getThreadPool())
254: _threadPool = null;
255: else if (_threadPool instanceof LifeCycle)
256: ((LifeCycle) _threadPool).stop();
257:
258: Thread[] acceptors = _acceptorThread;
259: _acceptorThread = null;
260: if (acceptors != null) {
261: for (int i = 0; i < acceptors.length; i++) {
262: Thread thread = acceptors[i];
263: if (thread != null)
264: thread.interrupt();
265: }
266: }
267:
268: try {
269: close();
270: } catch (IOException e) {
271: Log.warn(e);
272: }
273:
274: super .doStop();
275: }
276:
277: /* ------------------------------------------------------------ */
278: public void join() throws InterruptedException {
279: Thread[] threads = _acceptorThread;
280: if (threads != null)
281: for (int i = 0; i < threads.length; i++)
282: if (threads[i] != null)
283: threads[i].join();
284: }
285:
286: /* ------------------------------------------------------------ */
287: protected void configure(Socket socket) throws IOException {
288: try {
289: socket.setTcpNoDelay(true);
290: if (_maxIdleTime >= 0)
291: socket.setSoTimeout(_maxIdleTime);
292: if (_soLingerTime >= 0)
293: socket.setSoLinger(true, _soLingerTime / 1000);
294: else
295: socket.setSoLinger(false, 0);
296: } catch (Exception e) {
297: Log.ignore(e);
298: }
299: }
300:
301: /* ------------------------------------------------------------ */
302: public void customize(EndPoint endpoint, Request request)
303: throws IOException {
304: }
305:
306: /* ------------------------------------------------------------ */
307: public void persist(EndPoint endpoint) throws IOException {
308: }
309:
310: /* ------------------------------------------------------------ */
311: /*
312: * @see org.mortbay.jetty.Connector#getConfidentialPort()
313: */
314: public int getConfidentialPort() {
315: return _confidentialPort;
316: }
317:
318: /* ------------------------------------------------------------ */
319: /*
320: * @see org.mortbay.jetty.Connector#getConfidentialScheme()
321: */
322: public String getConfidentialScheme() {
323: return _confidentialScheme;
324: }
325:
326: /* ------------------------------------------------------------ */
327: /*
328: * @see org.mortbay.jetty.Connector#isConfidential(org.mortbay.jetty.Request)
329: */
330: public boolean isIntegral(Request request) {
331: return false;
332: }
333:
334: /* ------------------------------------------------------------ */
335: /*
336: * @see org.mortbay.jetty.Connector#getConfidentialPort()
337: */
338: public int getIntegralPort() {
339: return _integralPort;
340: }
341:
342: /* ------------------------------------------------------------ */
343: /*
344: * @see org.mortbay.jetty.Connector#getIntegralScheme()
345: */
346: public String getIntegralScheme() {
347: return _integralScheme;
348: }
349:
350: /* ------------------------------------------------------------ */
351: /*
352: * @see org.mortbay.jetty.Connector#isConfidential(org.mortbay.jetty.Request)
353: */
354: public boolean isConfidential(Request request) {
355: return false;
356: }
357:
358: /* ------------------------------------------------------------ */
359: /**
360: * @param confidentialPort The confidentialPort to set.
361: */
362: public void setConfidentialPort(int confidentialPort) {
363: _confidentialPort = confidentialPort;
364: }
365:
366: /* ------------------------------------------------------------ */
367: /**
368: * @param confidentialScheme The confidentialScheme to set.
369: */
370: public void setConfidentialScheme(String confidentialScheme) {
371: _confidentialScheme = confidentialScheme;
372: }
373:
374: /* ------------------------------------------------------------ */
375: /**
376: * @param integralPort The integralPort to set.
377: */
378: public void setIntegralPort(int integralPort) {
379: _integralPort = integralPort;
380: }
381:
382: /* ------------------------------------------------------------ */
383: /**
384: * @param integralScheme The integralScheme to set.
385: */
386: public void setIntegralScheme(String integralScheme) {
387: _integralScheme = integralScheme;
388: }
389:
390: /* ------------------------------------------------------------ */
391: public Continuation newContinuation() {
392: return new WaitingContinuation();
393: }
394:
395: /* ------------------------------------------------------------ */
396: protected abstract void accept(int acceptorID) throws IOException,
397: InterruptedException;
398:
399: /* ------------------------------------------------------------ */
400: public void stopAccept(int acceptorID) throws Exception {
401: }
402:
403: /* ------------------------------------------------------------ */
404: public boolean getResolveNames() {
405: return _useDNS;
406: }
407:
408: /* ------------------------------------------------------------ */
409: public void setResolveNames(boolean resolve) {
410: _useDNS = resolve;
411: }
412:
413: /* ------------------------------------------------------------ */
414: public String toString() {
415: String name = this .getClass().getName();
416: int dot = name.lastIndexOf('.');
417: if (dot > 0)
418: name = name.substring(dot + 1);
419:
420: return name + " @ "
421: + (getHost() == null ? "0.0.0.0" : getHost()) + ":"
422: + (getLocalPort() <= 0 ? getPort() : getLocalPort());
423: }
424:
425: /* ------------------------------------------------------------ */
426: /* ------------------------------------------------------------ */
427: /* ------------------------------------------------------------ */
428: private class Acceptor implements Runnable {
429: int _acceptor = 0;
430:
431: Acceptor(int id) {
432: _acceptor = id;
433: }
434:
435: /* ------------------------------------------------------------ */
436: public void run() {
437: Thread current = Thread.currentThread();
438: _acceptorThread[_acceptor] = current;
439: String name = _acceptorThread[_acceptor].getName();
440: current.setName(name + " - Acceptor" + _acceptor + " "
441: + AbstractConnector.this );
442: Log.debug("Starting " + this );
443: try {
444: current.setPriority(current.getPriority() - 1);
445: while (isRunning()) {
446: try {
447: accept(_acceptor);
448: } catch (EofException e) {
449: Log.ignore(e);
450: } catch (IOException e) {
451: Log.ignore(e);
452: } catch (Exception e) {
453: Log.warn(e);
454: }
455: }
456: } finally {
457: Log.debug("Stopping " + this );
458: current.setPriority(current.getPriority() + 1);
459: current.setName(name);
460: try {
461: if (_acceptor == 0)
462: close();
463: } catch (IOException e) {
464: Log.warn(e);
465: }
466: }
467: }
468: }
469:
470: public String getName() {
471: if (_name == null)
472: _name = (getHost() == null ? "0.0.0.0" : getHost())
473: + ":"
474: + (getLocalPort() <= 0 ? getPort() : getLocalPort());
475: return _name;
476: }
477:
478: public void setName(String name) {
479: _name = name;
480: }
481:
482: /* ------------------------------------------------------------ */
483: /**
484: * @return Get the number of requests handled by this context
485: * since last call of statsReset(). If setStatsOn(false) then this
486: * is undefined.
487: */
488: public int getRequests() {
489: return _requests;
490: }
491:
492: /* ------------------------------------------------------------ */
493: /**
494: * @return Returns the connectionsDurationMin.
495: */
496: public long getConnectionsDurationMin() {
497: return _connectionsDurationMin;
498: }
499:
500: /* ------------------------------------------------------------ */
501: /**
502: * @return Returns the connectionsDurationTotal.
503: */
504: public long getConnectionsDurationTotal() {
505: return _connectionsDurationTotal;
506: }
507:
508: /* ------------------------------------------------------------ */
509: /**
510: * @return Returns the connectionsOpenMin.
511: */
512: public int getConnectionsOpenMin() {
513: return _connectionsOpenMin;
514: }
515:
516: /* ------------------------------------------------------------ */
517: /**
518: * @return Returns the connectionsRequestsMin.
519: */
520: public int getConnectionsRequestsMin() {
521: return _connectionsRequestsMin;
522: }
523:
524: /* ------------------------------------------------------------ */
525: /**
526: * @return Number of connections accepted by the server since
527: * statsReset() called. Undefined if setStatsOn(false).
528: */
529: public int getConnections() {
530: return _connections;
531: }
532:
533: /* ------------------------------------------------------------ */
534: /**
535: * @return Number of connections currently open that were opened
536: * since statsReset() called. Undefined if setStatsOn(false).
537: */
538: public int getConnectionsOpen() {
539: return _connectionsOpen;
540: }
541:
542: /* ------------------------------------------------------------ */
543: /**
544: * @return Maximum number of connections opened simultaneously
545: * since statsReset() called. Undefined if setStatsOn(false).
546: */
547: public int getConnectionsOpenMax() {
548: return _connectionsOpenMax;
549: }
550:
551: /* ------------------------------------------------------------ */
552: /**
553: * @return Average duration in milliseconds of open connections
554: * since statsReset() called. Undefined if setStatsOn(false).
555: */
556: public long getConnectionsDurationAve() {
557: return _connections == 0 ? 0
558: : (_connectionsDurationTotal / _connections);
559: }
560:
561: /* ------------------------------------------------------------ */
562: /**
563: * @return Maximum duration in milliseconds of an open connection
564: * since statsReset() called. Undefined if setStatsOn(false).
565: */
566: public long getConnectionsDurationMax() {
567: return _connectionsDurationMax;
568: }
569:
570: /* ------------------------------------------------------------ */
571: /**
572: * @return Average number of requests per connection
573: * since statsReset() called. Undefined if setStatsOn(false).
574: */
575: public int getConnectionsRequestsAve() {
576: return _connections == 0 ? 0 : (_requests / _connections);
577: }
578:
579: /* ------------------------------------------------------------ */
580: /**
581: * @return Maximum number of requests per connection
582: * since statsReset() called. Undefined if setStatsOn(false).
583: */
584: public int getConnectionsRequestsMax() {
585: return _connectionsRequestsMax;
586: }
587:
588: /* ------------------------------------------------------------ */
589: /** Reset statistics.
590: */
591: public void statsReset() {
592: _statsStartedAt = _statsStartedAt == -1 ? -1 : System
593: .currentTimeMillis();
594:
595: _connections = 0;
596:
597: _connectionsOpenMin = _connectionsOpen;
598: _connectionsOpenMax = _connectionsOpen;
599: _connectionsOpen = 0;
600:
601: _connectionsDurationMin = 0;
602: _connectionsDurationMax = 0;
603: _connectionsDurationTotal = 0;
604:
605: _requests = 0;
606:
607: _connectionsRequestsMin = 0;
608: _connectionsRequestsMax = 0;
609: }
610:
611: /* ------------------------------------------------------------ */
612: public void setStatsOn(boolean on) {
613: if (on && _statsStartedAt != -1)
614: return;
615: Log.info("Statistics on = " + on + " for " + this );
616: statsReset();
617: _statsStartedAt = on ? System.currentTimeMillis() : -1;
618: }
619:
620: /* ------------------------------------------------------------ */
621: /**
622: * @return True if statistics collection is turned on.
623: */
624: public boolean getStatsOn() {
625: return _statsStartedAt != -1;
626: }
627:
628: /* ------------------------------------------------------------ */
629: /**
630: * @return Timestamp stats were started at.
631: */
632: public long getStatsOnMs() {
633: return (_statsStartedAt != -1) ? (System.currentTimeMillis() - _statsStartedAt)
634: : 0;
635: }
636:
637: /* ------------------------------------------------------------ */
638: protected void connectionOpened(HttpConnection connection) {
639: if (_statsStartedAt == -1)
640: return;
641: synchronized (_statsLock) {
642: _connectionsOpen++;
643: if (_connectionsOpen > _connectionsOpenMax)
644: _connectionsOpenMax = _connectionsOpen;
645: }
646: }
647:
648: /* ------------------------------------------------------------ */
649: protected void connectionClosed(HttpConnection connection) {
650: if (_statsStartedAt >= 0) {
651: synchronized (_statsLock) {
652: int requests = connection.getRequests();
653: _requests += requests;
654: long duration = System.currentTimeMillis()
655: - connection.getTimeStamp();
656: _connections++;
657: _connectionsOpen--;
658: _connectionsDurationTotal += duration;
659: if (_connectionsOpen < 0)
660: _connectionsOpen = 0;
661: if (_connectionsOpen < _connectionsOpenMin)
662: _connectionsOpenMin = _connectionsOpen;
663: if (_connectionsDurationMin == 0
664: || duration < _connectionsDurationMin)
665: _connectionsDurationMin = duration;
666: if (duration > _connectionsDurationMax)
667: _connectionsDurationMax = duration;
668: if (_connectionsRequestsMin == 0
669: || requests < _connectionsRequestsMin)
670: _connectionsRequestsMin = requests;
671: if (requests > _connectionsRequestsMax)
672: _connectionsRequestsMax = requests;
673: }
674: }
675:
676: connection.destroy();
677: }
678:
679: }
|