001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.transport;
018:
019: import java.io.IOException;
020: import java.util.List;
021: import org.apache.catalina.tribes.Member;
022:
023: /**
024: * <p>Title: </p>
025: *
026: * <p>Description: </p>
027: *
028: * <p>Company: </p>
029: *
030: * @author not attributable
031: * @version 1.0
032: */
033: public abstract class PooledSender extends AbstractSender implements
034: MultiPointSender {
035:
036: private SenderQueue queue = null;
037: private int poolSize = 25;
038:
039: public PooledSender() {
040: queue = new SenderQueue(this , poolSize);
041: }
042:
043: public abstract DataSender getNewDataSender();
044:
045: public DataSender getSender() {
046: return queue.getSender(getTimeout());
047: }
048:
049: public void returnSender(DataSender sender) {
050: sender.keepalive();
051: queue.returnSender(sender);
052: }
053:
054: public synchronized void connect() throws IOException {
055: //do nothing, happens in the socket sender itself
056: queue.open();
057: setConnected(true);
058: }
059:
060: public synchronized void disconnect() {
061: queue.close();
062: setConnected(false);
063: }
064:
065: public int getInPoolSize() {
066: return queue.getInPoolSize();
067: }
068:
069: public int getInUsePoolSize() {
070: return queue.getInUsePoolSize();
071: }
072:
073: public void setPoolSize(int poolSize) {
074: this .poolSize = poolSize;
075: queue.setLimit(poolSize);
076: }
077:
078: public int getPoolSize() {
079: return poolSize;
080: }
081:
082: public boolean keepalive() {
083: //do nothing, the pool checks on every return
084: return (queue == null) ? false : queue.checkIdleKeepAlive();
085: }
086:
087: public void add(Member member) {
088: // no op, senders created upon demans
089: }
090:
091: public void remove(Member member) {
092: //no op for now, should not cancel out any keys
093: //can create serious sync issues
094: //all TCP connections are cleared out through keepalive
095: //and if remote node disappears
096: }
097:
098: // ----------------------------------------------------- Inner Class
099:
100: private class SenderQueue {
101: private int limit = 25;
102:
103: PooledSender parent = null;
104:
105: private List notinuse = null;
106:
107: private List inuse = null;
108:
109: private boolean isOpen = true;
110:
111: public SenderQueue(PooledSender parent, int limit) {
112: this .limit = limit;
113: this .parent = parent;
114: notinuse = new java.util.LinkedList();
115: inuse = new java.util.LinkedList();
116: }
117:
118: /**
119: * @return Returns the limit.
120: */
121: public int getLimit() {
122: return limit;
123: }
124:
125: /**
126: * @param limit The limit to set.
127: */
128: public void setLimit(int limit) {
129: this .limit = limit;
130: }
131:
132: /**
133: * @return
134: */
135: public int getInUsePoolSize() {
136: return inuse.size();
137: }
138:
139: /**
140: * @return
141: */
142: public int getInPoolSize() {
143: return notinuse.size();
144: }
145:
146: public synchronized boolean checkIdleKeepAlive() {
147: DataSender[] list = new DataSender[notinuse.size()];
148: notinuse.toArray(list);
149: boolean result = false;
150: for (int i = 0; i < list.length; i++) {
151: result = result | list[i].keepalive();
152: }
153: return result;
154: }
155:
156: public synchronized DataSender getSender(long timeout) {
157: long start = System.currentTimeMillis();
158: while (true) {
159: if (!isOpen)
160: throw new IllegalStateException("Queue is closed");
161: DataSender sender = null;
162: if (notinuse.size() == 0 && inuse.size() < limit) {
163: sender = parent.getNewDataSender();
164: } else if (notinuse.size() > 0) {
165: sender = (DataSender) notinuse.remove(0);
166: }
167: if (sender != null) {
168: inuse.add(sender);
169: return sender;
170: }//end if
171: long delta = System.currentTimeMillis() - start;
172: if (delta > timeout && timeout > 0)
173: return null;
174: else {
175: try {
176: wait(Math.max(timeout - delta, 1));
177: } catch (InterruptedException x) {
178: }
179: }//end if
180: }
181: }
182:
183: public synchronized void returnSender(DataSender sender) {
184: if (!isOpen) {
185: sender.disconnect();
186: return;
187: }
188: //to do
189: inuse.remove(sender);
190: //just in case the limit has changed
191: if (notinuse.size() < this .getLimit())
192: notinuse.add(sender);
193: else
194: try {
195: sender.disconnect();
196: } catch (Exception ignore) {
197: }
198: notify();
199: }
200:
201: public synchronized void close() {
202: isOpen = false;
203: Object[] unused = notinuse.toArray();
204: Object[] used = inuse.toArray();
205: for (int i = 0; i < unused.length; i++) {
206: DataSender sender = (DataSender) unused[i];
207: sender.disconnect();
208: }//for
209: for (int i = 0; i < used.length; i++) {
210: DataSender sender = (DataSender) used[i];
211: sender.disconnect();
212: }//for
213: notinuse.clear();
214: inuse.clear();
215: notify();
216:
217: }
218:
219: public synchronized void open() {
220: isOpen = true;
221: notify();
222: }
223: }
224: }
|