001: /*
002: * Copyright (c) 1998-2008 Caucho Technology -- all rights reserved
003: *
004: * This file is part of Resin(R) Open Source
005: *
006: * Each copy or derived work must preserve the copyright notice and this
007: * notice unmodified.
008: *
009: * Resin Open Source is free software; you can redistribute it and/or modify
010: * it under the terms of the GNU General Public License as published by
011: * the Free Software Foundation; either version 2 of the License, or
012: * (at your option) any later version.
013: *
014: * Resin Open Source is distributed in the hope that it will be useful,
015: * but WITHOUT ANY WARRANTY; without even the implied warranty of
016: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE, or any warranty
017: * of NON-INFRINGEMENT. See the GNU General Public License for more
018: * details.
019: *
020: * You should have received a copy of the GNU General Public License
021: * along with Resin Open Source; if not, write to the
022: *
023: * Free Software Foundation, Inc.
024: * 59 Temple Place, Suite 330
025: * Boston, MA 02111-1307 USA
026: *
027: * @author Scott Ferguson
028: */
029:
030: package com.caucho.server.port;
031:
032: import com.caucho.loader.Environment;
033: import com.caucho.management.server.AbstractManagedObject;
034: import com.caucho.management.server.TcpConnectionMXBean;
035: import com.caucho.server.connection.*;
036: import com.caucho.server.resin.Resin;
037: import com.caucho.util.Alarm;
038: import com.caucho.util.ThreadPool;
039: import com.caucho.util.ThreadTask;
040: import com.caucho.vfs.ClientDisconnectException;
041: import com.caucho.vfs.QSocket;
042: import com.caucho.vfs.ReadStream;
043:
044: import java.io.IOException;
045: import java.net.InetAddress;
046: import java.util.logging.Level;
047: import java.util.logging.Logger;
048:
049: /**
050: * A protocol-independent TcpConnection. TcpConnection controls the
051: * TCP Socket and provides buffered streams.
052: *
053: * <p>Each TcpConnection has its own thread.
054: */
055: public class TcpConnection extends PortConnection implements ThreadTask {
056: private static final Logger log = Logger
057: .getLogger(TcpConnection.class.getName());
058:
059: private final QSocket _socket;
060:
061: private boolean _isInUse;
062: private boolean _isActive;
063: private boolean _isClosed;
064:
065: private boolean _isKeepalive;
066: private boolean _isResume;
067: private boolean _isWake;
068: private boolean _isDead;
069:
070: private final Object _requestLock = new Object();
071:
072: private final String _id;
073: private String _dbgId;
074: private String _name;
075:
076: private boolean _isSecure; // port security overrisde
077:
078: private final Admin _admin = new Admin();
079:
080: private String _state = "unknown";
081:
082: private long _connectionStartTime;
083: private long _requestStartTime;
084:
085: private long _suspendTime;
086:
087: private Thread _thread;
088:
089: /**
090: * Creates a new TcpConnection.
091: *
092: * @param server The TCP server controlling the connections
093: * @param request The protocol Request
094: */
095: TcpConnection(Port port, QSocket socket) {
096: setPort(port);
097:
098: _isSecure = port.isSecure();
099:
100: int id = getId();
101:
102: String protocol = port.getProtocol().getProtocolName();
103:
104: if (port.getAddress() == null) {
105: Resin resin = Resin.getLocal();
106: String serverId = resin != null ? resin.getServerId()
107: : null;
108: if (serverId == null)
109: serverId = "";
110:
111: _id = protocol + "-" + serverId + "-" + port.getPort()
112: + "-" + id;
113: _name = protocol + "-" + port.getPort() + "-" + id;
114: } else {
115: _id = (protocol + "-" + port.getAddress() + ":"
116: + port.getPort() + "-" + id);
117: _name = (protocol + "-" + port.getAddress() + "-"
118: + port.getPort() + "-" + id);
119: }
120:
121: _socket = socket;
122: }
123:
124: /**
125: * Returns the object name for jmx.
126: */
127: public String getName() {
128: return _name;
129: }
130:
131: public long getSuspendTime() {
132: return _suspendTime;
133: }
134:
135: /**
136: * Initialize the socket.
137: */
138: public void initSocket() throws IOException {
139: _isClosed = false;
140: _isInUse = true;
141: _isKeepalive = false;
142:
143: getWriteStream().init(_socket.getStream());
144: getReadStream().init(_socket.getStream(), getWriteStream());
145:
146: if (log.isLoggable(Level.FINE)) {
147: Port port = getPort();
148:
149: if (port != null)
150: log.fine(dbgId() + "starting connection " + this
151: + ", total=" + port.getConnectionCount());
152: else
153: log.fine(dbgId() + "starting connection " + this );
154: }
155: }
156:
157: /**
158: * Returns the connection's socket
159: */
160: public QSocket getSocket() {
161: return _socket;
162: }
163:
164: /**
165: * Returns the connection's socket
166: */
167: public QSocket startSocket() {
168: _isClosed = false;
169:
170: return _socket;
171: }
172:
173: /**
174: * Try to read nonblock
175: */
176: private boolean waitForKeepalive() throws IOException {
177: Port port = getPort();
178: QSocket socket = _socket;
179:
180: if (port.isClosed())
181: return false;
182:
183: ReadStream is = getReadStream();
184:
185: if (getReadStream().getBufferAvailable() > 0)
186: return true;
187:
188: long timeout = port.getKeepaliveTimeout();
189:
190: boolean isSelectManager = port.getServer()
191: .isSelectManagerEnabled();
192:
193: if (isSelectManager) {
194: timeout = port.getKeepaliveSelectThreadTimeout();
195: }
196:
197: if (timeout > 0 && timeout < port.getSocketTimeout()) {
198: port.keepaliveThreadBegin();
199:
200: try {
201: boolean isKeepalive = is.fillWithTimeout(timeout);
202:
203: return isKeepalive;
204: } finally {
205: port.keepaliveThreadEnd();
206: }
207: } else if (isSelectManager)
208: return false;
209: else
210: return true;
211: }
212:
213: public boolean isSecure() {
214: if (_isClosed)
215: return false;
216: else
217: return _socket.isSecure() || _isSecure;
218: }
219:
220: /**
221: * Returns true for closed.
222: */
223: public boolean isClosed() {
224: return _isClosed;
225: }
226:
227: /**
228: * Set true for active.
229: */
230: public void setActive(boolean isActive) {
231: _isActive = isActive;
232: }
233:
234: /**
235: * Returns true for active.
236: */
237: public boolean isActive() {
238: return _isActive;
239: }
240:
241: /**
242: * Sets the keepalive state. Called only by the SelectManager and Port.
243: */
244: public void setKeepalive() {
245: if (_isKeepalive)
246: log
247: .warning("illegal state: setting keepalive with active keepalive: "
248: + this );
249:
250: _isKeepalive = true;
251: }
252:
253: /**
254: * Sets the keepalive state. Called only by the SelectManager and Port.
255: */
256: public void clearKeepalive() {
257: if (!_isKeepalive)
258: log
259: .warning("illegal state: clearing keepalive with inactive keepalive: "
260: + this );
261:
262: _isKeepalive = false;
263: }
264:
265: /**
266: * Returns the local address of the socket.
267: */
268: public InetAddress getLocalAddress() {
269: // The extra cases handle Kaffe problems.
270: try {
271: return _socket.getLocalAddress();
272: } catch (Exception e) {
273: try {
274: return InetAddress.getLocalHost();
275: } catch (Exception e1) {
276: try {
277: return InetAddress.getByName("127.0.0.1");
278: } catch (Exception e2) {
279: return null;
280: }
281: }
282: }
283: }
284:
285: /**
286: * Returns the socket's local TCP port.
287: */
288: public int getLocalPort() {
289: return _socket.getLocalPort();
290: }
291:
292: /**
293: * Returns the socket's remote address.
294: */
295: public InetAddress getRemoteAddress() {
296: return _socket.getRemoteAddress();
297: }
298:
299: /**
300: * Returns the socket's remote host name.
301: */
302: public String getRemoteHost() {
303: return _socket.getRemoteHost();
304: }
305:
306: /**
307: * Adds from the socket's remote address.
308: */
309: public int getRemoteAddress(byte[] buffer, int offset, int length) {
310: return _socket.getRemoteAddress(buffer, offset, length);
311: }
312:
313: /**
314: * Returns the socket's remote port
315: */
316: public int getRemotePort() {
317: return _socket.getRemotePort();
318: }
319:
320: /**
321: * Returns the virtual host.
322: */
323: public String getVirtualHost() {
324: return getPort().getVirtualHost();
325: }
326:
327: /**
328: * Returns the state string.
329: */
330: public final String getState() {
331: return _state;
332: }
333:
334: /**
335: * Sets the state string.
336: */
337: public final void setState(String state) {
338: _state = state;
339: }
340:
341: /**
342: * Begins an active connection.
343: */
344: public final void beginActive() {
345: _state = "active";
346: _requestStartTime = Alarm.getCurrentTime();
347: }
348:
349: /**
350: * Ends an active connection.
351: */
352: public final void endActive() {
353: _state = "idle";
354: _requestStartTime = 0;
355: }
356:
357: /**
358: * Returns the thread id.
359: */
360: public final long getThreadId() {
361: Thread thread = _thread;
362:
363: if (thread != null)
364: return thread.getId();
365: else
366: return -1;
367: }
368:
369: /**
370: * Returns the time the current request has taken.
371: */
372: public final long getRequestActiveTime() {
373: if (_requestStartTime > 0)
374: return Alarm.getCurrentTime() - _requestStartTime;
375: else
376: return -1;
377: }
378:
379: public boolean allowKeepalive() {
380: return getPort().allowKeepalive(_connectionStartTime);
381: }
382:
383: /**
384: * Tries to mark the connection as a keepalive connection
385: *
386: * At exit, the connection is either:
387: * 1) freed (no keepalive)
388: * 2) rescheduled (keepalive with new thread)
389: * 3) in select pool (keepalive with poll)
390: */
391: private void keepalive() {
392: Port port = getPort();
393:
394: ConnectionController controller = getController();
395:
396: if (controller != null) {
397: // comet suspension
398: _suspendTime = Alarm.getCurrentTime();
399:
400: if (port.suspend(this )) {
401: if (log.isLoggable(Level.FINE))
402: log.fine(dbgId() + " suspend");
403: } else {
404: if (log.isLoggable(Level.FINE))
405: log.fine(dbgId() + " suspend fail");
406:
407: free();
408: }
409: } else if (!port.keepaliveBegin(this , _connectionStartTime)) {
410: free();
411: } else if (port.getSelectManager() != null) {
412: if (port.getSelectManager().keepalive(this )) {
413: if (log.isLoggable(Level.FINE))
414: log.fine(dbgId() + "keepalive (select)");
415: } else {
416: // XXX: s/b
417: // setKeepalive();
418: // ThreadPool.schedule(this);
419: log.warning(dbgId() + "failed keepalive (select)");
420:
421: port.keepaliveEnd(this );
422: free();
423: }
424: } else {
425: if (log.isLoggable(Level.FINE))
426: log.fine(dbgId() + "keepalive (thread)");
427:
428: setKeepalive();
429: ThreadPool.getThreadPool().schedule(this );
430: }
431: }
432:
433: void setResume() {
434: _isResume = true;
435: _isWake = false;
436: _suspendTime = 0;
437: }
438:
439: void setWake() {
440: _isWake = true;
441: }
442:
443: boolean isWake() {
444: return _isWake;
445: }
446:
447: boolean isComet() {
448: ConnectionController controller = getController();
449:
450: return controller != null && !controller.isClosed();
451: }
452:
453: /**
454: * Wakes the connection (comet-style).
455: */
456: protected boolean wake() {
457: ConnectionController controller = getController();
458:
459: if (controller != null) {
460: _isWake = true;
461:
462: if (getPort().resume(this )) {
463: log.fine(dbgId() + "wake");
464: return true;
465: } else {
466: log.fine(dbgId() + "wake failed");
467: return false;
468: }
469: } else
470: return false;
471: }
472:
473: /**
474: * Starts the connection.
475: */
476: public void start() {
477: Thread thread = Thread.currentThread();
478: ClassLoader oldLoader = thread.getContextClassLoader();
479: }
480:
481: /**
482: * Runs as a task.
483: */
484: public void run() {
485: Port port = getPort();
486:
487: boolean isKeepalive = _isKeepalive;
488: _isKeepalive = false;
489:
490: boolean isResume = _isResume;
491: _isResume = false;
492: _isWake = false;
493:
494: boolean isFirst = !isKeepalive;
495:
496: ServerRequest request = getRequest();
497: boolean isWaitForRead = request.isWaitForRead();
498:
499: Thread thread = Thread.currentThread();
500: String oldThreadName = thread.getName();
501:
502: thread.setName(_id);
503:
504: if (isKeepalive)
505: port.keepaliveEnd(this );
506:
507: port.threadBegin(this );
508:
509: ClassLoader systemLoader = ClassLoader.getSystemClassLoader();
510:
511: long startTime = Alarm.getExactTime();
512:
513: thread.setContextClassLoader(systemLoader);
514:
515: //_admin.register();
516:
517: try {
518: _thread = thread;
519:
520: if (isResume) {
521: ConnectionController controller = getController();
522:
523: if (request.handleResume()) {
524: isKeepalive = true;
525: } else {
526: if (controller != null)
527: controller.close();
528:
529: isKeepalive = false;
530:
531: closeImpl();
532: }
533:
534: return;
535: }
536:
537: while (!_isDead) {
538: if (isKeepalive) {
539: } else if (port.accept(this , isFirst))
540: _connectionStartTime = Alarm.getCurrentTime();
541: else
542: return;
543:
544: isFirst = false;
545: ConnectionController controller = null;
546:
547: try {
548: thread.interrupted();
549: // clear the interrupted flag
550:
551: do {
552: thread.setContextClassLoader(systemLoader);
553:
554: controller = null;
555:
556: isKeepalive = false;
557:
558: if (!port.isClosed()
559: && (!isWaitForRead || getReadStream()
560: .waitForRead())) {
561:
562: synchronized (_requestLock) {
563: isKeepalive = request.handleRequest();
564: }
565:
566: controller = getController();
567: if (controller != null
568: && controller.isActive()) {
569: isKeepalive = true;
570: return;
571: }
572: /* XXX: else if (isKeepalive) check
573: else
574: controller.close();
575: */
576: }
577: } while (isKeepalive && waitForKeepalive()
578: && !port.isClosed());
579:
580: if (isKeepalive) {
581: return;
582: } else {
583: getRequest().protocolCloseEvent();
584: }
585: } catch (ClientDisconnectException e) {
586: isKeepalive = false;
587:
588: if (controller != null)
589: controller.close();
590:
591: if (log.isLoggable(Level.FINER))
592: log.finer(dbgId() + e);
593: } catch (IOException e) {
594: isKeepalive = false;
595:
596: if (controller != null)
597: controller.close();
598:
599: if (log.isLoggable(Level.FINE))
600: log.log(Level.FINE, dbgId() + e, e);
601: } finally {
602: thread.setContextClassLoader(systemLoader);
603:
604: if (!isKeepalive)
605: closeImpl();
606: }
607: }
608: } catch (Throwable e) {
609: log.log(Level.WARNING, e.toString(), e);
610: isKeepalive = false;
611: } finally {
612: thread.setContextClassLoader(systemLoader);
613:
614: //_admin.unregister();
615:
616: port.threadEnd(this );
617: if (isKeepalive) {
618: keepalive();
619: } else
620: free();
621:
622: _thread = null;
623: thread.setName(oldThreadName);
624: }
625: }
626:
627: /**
628: * Sends a broadcast request.
629: */
630: public void sendBroadcast(BroadcastTask task) {
631: synchronized (_requestLock) {
632: task.execute(this );
633: }
634: }
635:
636: /**
637: * Closes on shutdown.
638: */
639: public void closeOnShutdown() {
640: QSocket socket = _socket;
641:
642: if (socket != null) {
643: try {
644: socket.close();
645: } catch (Throwable e) {
646: log.log(Level.FINE, e.toString(), e);
647: }
648:
649: Thread.currentThread().yield();
650: }
651: }
652:
653: /**
654: * Closes the connection.
655: */
656: private void closeImpl() {
657: QSocket socket = _socket;
658: boolean isClosed;
659:
660: synchronized (this ) {
661: isClosed = _isClosed;
662: _isClosed = true;
663: }
664:
665: getPort().detach(this );
666:
667: ConnectionController controller = getController();
668: if (controller != null)
669: controller.close();
670:
671: if (!isClosed) {
672: _isActive = false;
673: boolean isKeepalive = _isKeepalive;
674: _isKeepalive = false;
675:
676: Port port = getPort();
677:
678: if (isKeepalive) {
679: port.keepaliveEnd(this );
680: }
681:
682: if (log.isLoggable(Level.FINE) && _isInUse) {
683: if (port != null)
684: log.fine(dbgId() + "closing connection " + this
685: + ", total=" + port.getConnectionCount());
686: else
687: log.fine(dbgId() + "closing connection " + this );
688: }
689:
690: _isInUse = false;
691: _isWake = false;
692: _isResume = false;
693:
694: try {
695: getWriteStream().close();
696: } catch (Throwable e) {
697: log.log(Level.FINE, e.toString(), e);
698: }
699:
700: try {
701: getReadStream().close();
702: } catch (Throwable e) {
703: log.log(Level.FINE, e.toString(), e);
704: }
705:
706: if (socket != null) {
707: try {
708: socket.close();
709: } catch (Throwable e) {
710: log.log(Level.FINE, e.toString(), e);
711: }
712:
713: getPort().closeSocket(socket);
714: }
715: }
716: }
717:
718: /**
719: * Closes the controller.
720: */
721: protected void closeControllerImpl() {
722: getPort().resume(this );
723: }
724:
725: /**
726: * Destroys the connection()
727: */
728: public final void destroy() {
729: _isDead = true;
730:
731: closeImpl();
732: }
733:
734: /**
735: * Frees the connection()
736: */
737: final void free() {
738: closeImpl();
739:
740: setState("free");
741:
742: if (!_isDead)
743: getPort().free(this );
744: else
745: getPort().kill(this );
746: }
747:
748: protected String dbgId() {
749: if (_dbgId == null) {
750: Object serverId = Environment
751: .getAttribute("caucho.server-id");
752: String prefix = "";
753:
754: if (serverId != null)
755: _dbgId = "Tcp[" + serverId + "," + getId() + "] ";
756: else
757: _dbgId = "Tcp[" + getId() + "] ";
758: }
759:
760: return _dbgId;
761: }
762:
763: public String toString() {
764: if (_isActive)
765: return "TcpConnection[id=" + _id + ",socket=" + _socket
766: + ",active]";
767: else
768: return "TcpConnection[id=" + _id + ",socket=" + _socket
769: + ",port=" + getPort() + "]";
770: }
771:
772: class Admin extends AbstractManagedObject implements
773: TcpConnectionMXBean {
774: Admin() {
775: super (ClassLoader.getSystemClassLoader());
776: }
777:
778: public String getName() {
779: return _name;
780: }
781:
782: public long getThreadId() {
783: return TcpConnection.this .getThreadId();
784: }
785:
786: public long getRequestActiveTime() {
787: return TcpConnection.this .getRequestActiveTime();
788: }
789:
790: public String getState() {
791: return TcpConnection.this .getState();
792: }
793:
794: void register() {
795: registerSelf();
796: }
797:
798: void unregister() {
799: unregisterSelf();
800: }
801: }
802: }
|