001: /*
002: * Copyright 1999,2004 The Apache Software Foundation.
003: *
004: * Licensed under the Apache License, Version 2.0 (the "License");
005: * you may not use this file except in compliance with the License.
006: * You may obtain a copy of the License at
007: *
008: * http://www.apache.org/licenses/LICENSE-2.0
009: *
010: * Unless required by applicable law or agreed to in writing, software
011: * distributed under the License is distributed on an "AS IS" BASIS,
012: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013: * See the License for the specific language governing permissions and
014: * limitations under the License.
015: */
016:
017: package org.apache.catalina.cluster.tcp;
018:
019: import java.net.InetAddress;
020: import java.net.Socket;
021: import java.util.LinkedList;
022: import java.util.List;
023: import java.util.Collections;
024:
025: /**
026: * <p>Title: </p>
027: * <p>Description: </p>
028: * <p>Copyright: Copyright (c) 2002</p>
029: * <p>Company: </p>
030: * @author not attributable
031: * @version 1.0
032: */
033:
034: public class PooledSocketSender implements IDataSender {
035:
036: private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
037: .getLog(org.apache.catalina.cluster.CatalinaCluster.class);
038:
039: private InetAddress address;
040: private int port;
041: private Socket sc = null;
042: private boolean isSocketConnected = true;
043: private boolean suspect;
044: private long ackTimeout = 15 * 1000; //15 seconds socket read timeout (for acknowledgement)
045: private long keepAliveTimeout = 60 * 1000; //keep socket open for no more than one min
046: private int keepAliveMaxRequestCount = 100; //max 100 requests before reconnecting
047: private long keepAliveConnectTime = 0;
048: private int keepAliveCount = 0;
049: private int maxPoolSocketLimit = 25;
050:
051: private SenderQueue senderQueue = null;
052:
053: public PooledSocketSender(InetAddress host, int port) {
054: this .address = host;
055: this .port = port;
056: senderQueue = new SenderQueue(this , maxPoolSocketLimit);
057: }
058:
059: public InetAddress getAddress() {
060: return address;
061: }
062:
063: public int getPort() {
064: return port;
065: }
066:
067: public void connect() throws java.io.IOException {
068: //do nothing, happens in the socket sender itself
069: senderQueue.open();
070: isSocketConnected = true;
071: }
072:
073: public void disconnect() {
074: senderQueue.close();
075: isSocketConnected = false;
076: }
077:
078: public boolean isConnected() {
079: return isSocketConnected;
080: }
081:
082: public void setAckTimeout(long timeout) {
083: this .ackTimeout = timeout;
084: }
085:
086: public long getAckTimeout() {
087: return ackTimeout;
088: }
089:
090: public void setMaxPoolSocketLimit(int limit) {
091: maxPoolSocketLimit = limit;
092: }
093:
094: public int getMaxPoolSocketLimit() {
095: return maxPoolSocketLimit;
096: }
097:
098: /**
099: * Blocking send
100: * @param data
101: * @throws java.io.IOException
102: */
103: public void sendMessage(String sessionId, byte[] data)
104: throws java.io.IOException {
105: //get a socket sender from the pool
106: SocketSender sender = senderQueue.getSender(0);
107: if (sender == null) {
108: log.warn("No socket sender available for client="
109: + this .getAddress() + ":" + this .getPort()
110: + " did it disappear?");
111: return;
112: }//end if
113: //send the message
114: sender.sendMessage(sessionId, data);
115: //return the connection to the pool
116: senderQueue.returnSender(sender);
117: }
118:
119: public String toString() {
120: StringBuffer buf = new StringBuffer("PooledSocketSender[");
121: buf.append(getAddress()).append(":").append(getPort()).append(
122: "]");
123: return buf.toString();
124: }
125:
126: public boolean getSuspect() {
127: return suspect;
128: }
129:
130: public void setSuspect(boolean suspect) {
131: this .suspect = suspect;
132: }
133:
134: public long getKeepAliveTimeout() {
135: return keepAliveTimeout;
136: }
137:
138: public void setKeepAliveTimeout(long keepAliveTimeout) {
139: this .keepAliveTimeout = keepAliveTimeout;
140: }
141:
142: public int getKeepAliveMaxRequestCount() {
143: return keepAliveMaxRequestCount;
144: }
145:
146: public void setKeepAliveMaxRequestCount(int keepAliveMaxRequestCount) {
147: this .keepAliveMaxRequestCount = keepAliveMaxRequestCount;
148: }
149:
150: private class SenderQueue {
151: private int limit = 25;
152: PooledSocketSender parent = null;
153: private LinkedList queue = new LinkedList();
154: private LinkedList inuse = new LinkedList();
155: private Object mutex = new Object();
156: private boolean isOpen = true;
157:
158: public SenderQueue(PooledSocketSender parent, int limit) {
159: this .limit = limit;
160: this .parent = parent;
161: }
162:
163: public SocketSender getSender(long timeout) {
164: SocketSender sender = null;
165: long start = System.currentTimeMillis();
166: long delta = 0;
167: do {
168: synchronized (mutex) {
169: if (!isOpen)
170: throw new IllegalStateException(
171: "Socket pool is closed.");
172: if (queue.size() > 0) {
173: sender = (SocketSender) queue.removeFirst();
174: } else if (inuse.size() < limit) {
175: sender = getNewSocketSender();
176: } else {
177: try {
178: mutex.wait(timeout);
179: } catch (Exception x) {
180: parent.log
181: .warn(
182: "PoolSocketSender.senderQueue.getSender failed",
183: x);
184: }//catch
185: }//end if
186: if (sender != null) {
187: inuse.add(sender);
188: }
189: }//synchronized
190: delta = System.currentTimeMillis() - start;
191: } while ((isOpen) && (sender == null)
192: && (timeout == 0 ? true : (delta < timeout)));
193: //to do
194: return sender;
195: }
196:
197: public void returnSender(SocketSender sender) {
198: //to do
199: synchronized (mutex) {
200: queue.add(sender);
201: inuse.remove(sender);
202: mutex.notify();
203: }
204: }
205:
206: private SocketSender getNewSocketSender() {
207: //new SocketSender(
208: SocketSender sender = new SocketSender(parent.getAddress(),
209: parent.getPort());
210: sender.setKeepAliveMaxRequestCount(parent
211: .getKeepAliveMaxRequestCount());
212: sender.setKeepAliveTimeout(parent.getKeepAliveTimeout());
213: sender.setAckTimeout(parent.getAckTimeout());
214: return sender;
215:
216: }
217:
218: public void close() {
219: synchronized (mutex) {
220: for (int i = 0; i < queue.size(); i++) {
221: SocketSender sender = (SocketSender) queue.get(i);
222: sender.disconnect();
223: }//for
224: for (int i = 0; i < inuse.size(); i++) {
225: SocketSender sender = (SocketSender) inuse.get(i);
226: sender.disconnect();
227: }//for
228: queue.clear();
229: inuse.clear();
230: isOpen = false;
231: mutex.notifyAll();
232: }
233: }
234:
235: public void open() {
236: synchronized (mutex) {
237: isOpen = true;
238: mutex.notifyAll();
239: }
240: }
241: }
242: }
|