001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.transport;
018:
019: import java.io.IOException;
020: import java.net.InetAddress;
021: import java.net.InetSocketAddress;
022: import java.net.ServerSocket;
023: import java.util.concurrent.ExecutorService;
024: import java.util.concurrent.LinkedBlockingQueue;
025: import java.util.concurrent.ThreadPoolExecutor;
026: import java.util.concurrent.TimeUnit;
027:
028: import org.apache.catalina.tribes.ChannelMessage;
029: import org.apache.catalina.tribes.ChannelReceiver;
030: import org.apache.catalina.tribes.MessageListener;
031: import org.apache.catalina.tribes.io.ListenCallback;
032: import org.apache.juli.logging.Log;
033:
034: /**
035: * <p>Title: </p>
036: *
037: * <p>Description: </p>
038: *
039: * <p>Company: </p>
040: *
041: * @author not attributable
042: * @version 1.0
043: */
044: public abstract class ReceiverBase implements ChannelReceiver,
045: ListenCallback, RxTaskPool.TaskCreator {
046:
047: public static final int OPTION_DIRECT_BUFFER = 0x0004;
048:
049: protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
050: .getLog(ReceiverBase.class);
051:
052: private MessageListener listener;
053: private String host = "auto";
054: private InetAddress bind;
055: private int port = 4000;
056: private int securePort = -1;
057: private int rxBufSize = 43800;
058: private int txBufSize = 25188;
059: private boolean listen = false;
060: private RxTaskPool pool;
061: private boolean direct = true;
062: private long tcpSelectorTimeout = 5000;
063: //how many times to search for an available socket
064: private int autoBind = 100;
065: private int maxThreads = Integer.MAX_VALUE;
066: private int minThreads = 6;
067: private int maxTasks = 100;
068: private int minTasks = 10;
069: private boolean tcpNoDelay = true;
070: private boolean soKeepAlive = false;
071: private boolean ooBInline = true;
072: private boolean soReuseAddress = true;
073: private boolean soLingerOn = true;
074: private int soLingerTime = 3;
075: private int soTrafficClass = 0x04 | 0x08 | 0x010;
076: private int timeout = 3000; //3 seconds
077: private boolean useBufferPool = true;
078:
079: private ExecutorService executor;
080:
081: public ReceiverBase() {
082: }
083:
084: public void start() throws IOException {
085: if (executor == null) {
086: executor = new ThreadPoolExecutor(minThreads, maxThreads,
087: 60, TimeUnit.SECONDS,
088: new LinkedBlockingQueue<Runnable>());
089: }
090: }
091:
092: public void stop() {
093: if (executor != null)
094: executor.shutdownNow();//ignore left overs
095: executor = null;
096: }
097:
098: /**
099: * getMessageListener
100: *
101: * @return MessageListener
102: * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
103: */
104: public MessageListener getMessageListener() {
105: return listener;
106: }
107:
108: /**
109: *
110: * @return The port
111: * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
112: */
113: public int getPort() {
114: return port;
115: }
116:
117: public int getRxBufSize() {
118: return rxBufSize;
119: }
120:
121: public int getTxBufSize() {
122: return txBufSize;
123: }
124:
125: /**
126: * @deprecated use getMinThreads()/getMaxThreads()
127: * @return int
128: */
129: public int getTcpThreadCount() {
130: return getMaxThreads();
131: }
132:
133: /**
134: * setMessageListener
135: *
136: * @param listener MessageListener
137: * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
138: */
139: public void setMessageListener(MessageListener listener) {
140: this .listener = listener;
141: }
142:
143: /**
144: * @deprecated use setPort
145: * @param tcpListenPort int
146: */
147: public void setTcpListenPort(int tcpListenPort) {
148: setPort(tcpListenPort);
149: }
150:
151: /**
152: * @deprecated use setAddress
153: * @param tcpListenHost String
154: */
155: public void setTcpListenAddress(String tcpListenHost) {
156: setAddress(tcpListenHost);
157: }
158:
159: public void setRxBufSize(int rxBufSize) {
160: this .rxBufSize = rxBufSize;
161: }
162:
163: public void setTxBufSize(int txBufSize) {
164: this .txBufSize = txBufSize;
165: }
166:
167: /**
168: * @deprecated use setMaxThreads/setMinThreads
169: * @param tcpThreadCount int
170: */
171: public void setTcpThreadCount(int tcpThreadCount) {
172: setMaxThreads(tcpThreadCount);
173: setMinThreads(tcpThreadCount);
174: }
175:
176: /**
177: * @return Returns the bind.
178: */
179: public InetAddress getBind() {
180: if (bind == null) {
181: try {
182: if ("auto".equals(host)) {
183: host = java.net.InetAddress.getLocalHost()
184: .getHostAddress();
185: }
186: if (log.isDebugEnabled())
187: log
188: .debug("Starting replication listener on address:"
189: + host);
190: bind = java.net.InetAddress.getByName(host);
191: } catch (IOException ioe) {
192: log.error(
193: "Failed bind replication listener on address:"
194: + host, ioe);
195: }
196: }
197: return bind;
198: }
199:
200: /**
201: * recursive bind to find the next available port
202: * @param socket ServerSocket
203: * @param portstart int
204: * @param retries int
205: * @return int
206: * @throws IOException
207: */
208: protected int bind(ServerSocket socket, int portstart, int retries)
209: throws IOException {
210: InetSocketAddress addr = null;
211: while (retries > 0) {
212: try {
213: addr = new InetSocketAddress(getBind(), portstart);
214: socket.bind(addr);
215: setPort(portstart);
216: log.info("Receiver Server Socket bound to:" + addr);
217: return 0;
218: } catch (IOException x) {
219: retries--;
220: if (retries <= 0) {
221: log.info("Unable to bind server socket to:" + addr
222: + " throwing error.");
223: throw x;
224: }
225: portstart++;
226: try {
227: Thread.sleep(25);
228: } catch (InterruptedException ti) {
229: Thread.currentThread().interrupted();
230: }
231: retries = bind(socket, portstart, retries);
232: }
233: }
234: return retries;
235: }
236:
237: public void messageDataReceived(ChannelMessage data) {
238: if (this .listener != null) {
239: if (listener.accept(data))
240: listener.messageReceived(data);
241: }
242: }
243:
244: public int getWorkerThreadOptions() {
245: int options = 0;
246: if (getDirect())
247: options = options | OPTION_DIRECT_BUFFER;
248: return options;
249: }
250:
251: /**
252: * @param bind The bind to set.
253: */
254: public void setBind(java.net.InetAddress bind) {
255: this .bind = bind;
256: }
257:
258: /**
259: * @deprecated use getPort
260: * @return int
261: */
262: public int getTcpListenPort() {
263: return getPort();
264: }
265:
266: public boolean getDirect() {
267: return direct;
268: }
269:
270: public void setDirect(boolean direct) {
271: this .direct = direct;
272: }
273:
274: public String getAddress() {
275: getBind();
276: return this .host;
277: }
278:
279: public String getHost() {
280: return getAddress();
281: }
282:
283: public long getSelectorTimeout() {
284: return tcpSelectorTimeout;
285: }
286:
287: /**
288: * @deprecated use getSelectorTimeout
289: * @return long
290: */
291: public long getTcpSelectorTimeout() {
292: return getSelectorTimeout();
293: }
294:
295: public boolean doListen() {
296: return listen;
297: }
298:
299: public MessageListener getListener() {
300: return listener;
301: }
302:
303: public RxTaskPool getTaskPool() {
304: return pool;
305: }
306:
307: /**
308: * @deprecated use getAddress
309: * @return String
310: */
311: public String getTcpListenAddress() {
312: return getAddress();
313: }
314:
315: public int getAutoBind() {
316: return autoBind;
317: }
318:
319: public int getMaxThreads() {
320: return maxThreads;
321: }
322:
323: public int getMinThreads() {
324: return minThreads;
325: }
326:
327: public boolean getTcpNoDelay() {
328: return tcpNoDelay;
329: }
330:
331: public boolean getSoKeepAlive() {
332: return soKeepAlive;
333: }
334:
335: public boolean getOoBInline() {
336: return ooBInline;
337: }
338:
339: public boolean getSoLingerOn() {
340: return soLingerOn;
341: }
342:
343: public int getSoLingerTime() {
344: return soLingerTime;
345: }
346:
347: public boolean getSoReuseAddress() {
348: return soReuseAddress;
349: }
350:
351: public int getSoTrafficClass() {
352: return soTrafficClass;
353: }
354:
355: public int getTimeout() {
356: return timeout;
357: }
358:
359: public boolean getUseBufferPool() {
360: return useBufferPool;
361: }
362:
363: public int getSecurePort() {
364: return securePort;
365: }
366:
367: public int getMinTasks() {
368: return minTasks;
369: }
370:
371: public int getMaxTasks() {
372: return maxTasks;
373: }
374:
375: public ExecutorService getExecutor() {
376: return executor;
377: }
378:
379: public boolean isListening() {
380: return listen;
381: }
382:
383: /**
384: * @deprecated use setSelectorTimeout
385: * @param selTimeout long
386: */
387: public void setTcpSelectorTimeout(long selTimeout) {
388: setSelectorTimeout(selTimeout);
389: }
390:
391: public void setSelectorTimeout(long selTimeout) {
392: tcpSelectorTimeout = selTimeout;
393: }
394:
395: public void setListen(boolean doListen) {
396: this .listen = doListen;
397: }
398:
399: public void setAddress(String host) {
400: this .host = host;
401: }
402:
403: public void setHost(String host) {
404: setAddress(host);
405: }
406:
407: public void setListener(MessageListener listener) {
408: this .listener = listener;
409: }
410:
411: public void setLog(Log log) {
412: this .log = log;
413: }
414:
415: public void setPool(RxTaskPool pool) {
416: this .pool = pool;
417: }
418:
419: public void setPort(int port) {
420: this .port = port;
421: }
422:
423: public void setAutoBind(int autoBind) {
424: this .autoBind = autoBind;
425: if (this .autoBind <= 0)
426: this .autoBind = 1;
427: }
428:
429: public void setMaxThreads(int maxThreads) {
430: this .maxThreads = maxThreads;
431: }
432:
433: public void setMinThreads(int minThreads) {
434: this .minThreads = minThreads;
435: }
436:
437: public void setTcpNoDelay(boolean tcpNoDelay) {
438: this .tcpNoDelay = tcpNoDelay;
439: }
440:
441: public void setSoKeepAlive(boolean soKeepAlive) {
442: this .soKeepAlive = soKeepAlive;
443: }
444:
445: public void setOoBInline(boolean ooBInline) {
446: this .ooBInline = ooBInline;
447: }
448:
449: public void setSoLingerOn(boolean soLingerOn) {
450: this .soLingerOn = soLingerOn;
451: }
452:
453: public void setSoLingerTime(int soLingerTime) {
454: this .soLingerTime = soLingerTime;
455: }
456:
457: public void setSoReuseAddress(boolean soReuseAddress) {
458: this .soReuseAddress = soReuseAddress;
459: }
460:
461: public void setSoTrafficClass(int soTrafficClass) {
462: this .soTrafficClass = soTrafficClass;
463: }
464:
465: public void setTimeout(int timeout) {
466: this .timeout = timeout;
467: }
468:
469: public void setUseBufferPool(boolean useBufferPool) {
470: this .useBufferPool = useBufferPool;
471: }
472:
473: public void setSecurePort(int securePort) {
474: this .securePort = securePort;
475: }
476:
477: public void setMinTasks(int minTasks) {
478: this .minTasks = minTasks;
479: }
480:
481: public void setMaxTasks(int maxTasks) {
482: this .maxTasks = maxTasks;
483: }
484:
485: public void setExecutor(ExecutorService executor) {
486: this .executor = executor;
487: }
488:
489: public void heartbeat() {
490: //empty operation
491: }
492:
493: }
|