001: /*
002: * Copyright 1999,2004 The Apache Software Foundation.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.apache.catalina.cluster.tcp;
018:
019: import java.net.InetAddress;
020: import java.net.Socket;
021:
022: /**
023: * <p>Title: </p>
024: * <p>Description: </p>
025: * <p>Copyright: Copyright (c) 2002</p>
026: * <p>Company: </p>
027: * @author not attributable
028: * @version 1.0
029: */
030:
031: public class SocketSender implements IDataSender {
032:
033: private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
034: .getLog(SocketSender.class);
035:
036: private InetAddress address;
037: private int port;
038: private Socket sc = null;
039: private boolean isSocketConnected = false;
040: private boolean suspect;
041: private long ackTimeout = 15 * 1000; //15 seconds socket read timeout (for acknowledgement)
042: private long keepAliveTimeout = 60 * 1000; //keep socket open for no more than one min
043: private int keepAliveMaxRequestCount = 100; //max 100 requests before reconnecting
044: private long keepAliveConnectTime = 0;
045: private int keepAliveCount = 0;
046:
047: public SocketSender(InetAddress host, int port) {
048: this .address = host;
049: this .port = port;
050: }
051:
052: public InetAddress getAddress() {
053: return address;
054: }
055:
056: public int getPort() {
057: return port;
058: }
059:
060: public void connect() throws java.io.IOException {
061: sc = new Socket(getAddress(), getPort());
062: sc.setSoTimeout((int) ackTimeout);
063: isSocketConnected = true;
064: this .keepAliveCount = 0;
065: this .keepAliveConnectTime = System.currentTimeMillis();
066: }
067:
068: public void disconnect() {
069: try {
070: sc.close();
071: } catch (Exception x) {
072: }
073: isSocketConnected = false;
074: }
075:
076: public boolean isConnected() {
077: return isSocketConnected;
078: }
079:
080: public void checkIfDisconnect() {
081: long ctime = System.currentTimeMillis()
082: - this .keepAliveConnectTime;
083: if ((ctime > this .keepAliveTimeout)
084: || (this .keepAliveCount >= this .keepAliveMaxRequestCount)) {
085: disconnect();
086: }
087: }
088:
089: public void setAckTimeout(long timeout) {
090: this .ackTimeout = timeout;
091: }
092:
093: public long getAckTimeout() {
094: return ackTimeout;
095: }
096:
097: /**
098: * Blocking send
099: * @param data
100: * @throws java.io.IOException
101: */
102: public synchronized void sendMessage(String sessionId, byte[] data)
103: throws java.io.IOException {
104: checkIfDisconnect();
105: if (!isConnected())
106: connect();
107: try {
108: sc.getOutputStream().write(data);
109: sc.getOutputStream().flush();
110: waitForAck(ackTimeout);
111: } catch (java.io.IOException x) {
112: disconnect();
113: connect();
114: sc.getOutputStream().write(data);
115: sc.getOutputStream().flush();
116: waitForAck(ackTimeout);
117: }
118: this .keepAliveCount++;
119: checkIfDisconnect();
120:
121: }
122:
123: private void waitForAck(long timeout) throws java.io.IOException {
124: try {
125: int i = sc.getInputStream().read();
126: while ((i != -1) && (i != 3)) {
127: i = sc.getInputStream().read();
128: }
129: } catch (java.net.SocketTimeoutException x) {
130: log.warn("Wasn't able to read acknowledgement from server["
131: + getAddress() + ":" + getPort() + "] in "
132: + this .ackTimeout + " ms."
133: + " Disconnecting socket, and trying again.");
134: throw x;
135: }
136: }
137:
138: public String toString() {
139: StringBuffer buf = new StringBuffer("SocketSender[");
140: buf.append(getAddress()).append(":").append(getPort()).append(
141: "]");
142: return buf.toString();
143: }
144:
145: public boolean isSuspect() {
146: return suspect;
147: }
148:
149: public boolean getSuspect() {
150: return suspect;
151: }
152:
153: public void setSuspect(boolean suspect) {
154: this .suspect = suspect;
155: }
156:
157: public long getKeepAliveTimeout() {
158: return keepAliveTimeout;
159: }
160:
161: public void setKeepAliveTimeout(long keepAliveTimeout) {
162: this .keepAliveTimeout = keepAliveTimeout;
163: }
164:
165: public int getKeepAliveMaxRequestCount() {
166: return keepAliveMaxRequestCount;
167: }
168:
169: public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) {
170: this.keepAliveMaxRequestCount = keepAliveMaxRequestCount;
171: }
172:
173: }
|