001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017: package org.apache.catalina.tribes.transport.bio;
018:
019: import java.io.IOException;
020: import java.net.ServerSocket;
021: import java.net.Socket;
022:
023: import org.apache.catalina.tribes.ChannelReceiver;
024: import org.apache.catalina.tribes.io.ListenCallback;
025: import org.apache.catalina.tribes.io.ObjectReader;
026: import org.apache.catalina.tribes.transport.ReceiverBase;
027: import org.apache.catalina.tribes.transport.RxTaskPool;
028: import org.apache.catalina.tribes.transport.AbstractRxTask;
029:
030: /**
031: * <p>Title: </p>
032: *
033: * <p>Description: </p>
034: *
035: * <p>Company: </p>
036: *
037: * @author not attributable
038: * @version 1.0
039: */
040: public class BioReceiver extends ReceiverBase implements Runnable,
041: ChannelReceiver, ListenCallback {
042:
043: protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
044: .getLog(BioReceiver.class);
045:
046: protected ServerSocket serverSocket;
047:
048: public BioReceiver() {
049: }
050:
051: /**
052: *
053: * @throws IOException
054: * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
055: */
056: public void start() throws IOException {
057: super .start();
058: try {
059: setPool(new RxTaskPool(getMaxThreads(), getMinThreads(),
060: this ));
061: } catch (Exception x) {
062: log.fatal("ThreadPool can initilzed. Listener not started",
063: x);
064: if (x instanceof IOException)
065: throw (IOException) x;
066: else
067: throw new IOException(x.getMessage());
068: }
069: try {
070: getBind();
071: bind();
072: Thread t = new Thread(this , "BioReceiver");
073: t.setDaemon(true);
074: t.start();
075: } catch (Exception x) {
076: log.fatal("Unable to start cluster receiver", x);
077: if (x instanceof IOException)
078: throw (IOException) x;
079: else
080: throw new IOException(x.getMessage());
081: }
082: }
083:
084: public AbstractRxTask createRxTask() {
085: return getReplicationThread();
086: }
087:
088: protected BioReplicationTask getReplicationThread() {
089: BioReplicationTask result = new BioReplicationTask(this );
090: result.setOptions(getWorkerThreadOptions());
091: result.setUseBufferPool(this .getUseBufferPool());
092: return result;
093: }
094:
095: /**
096: *
097: * @todo Implement this org.apache.catalina.tribes.ChannelReceiver method
098: */
099: public void stop() {
100: setListen(false);
101: try {
102: this .serverSocket.close();
103: } catch (Exception x) {
104: }
105: super .stop();
106: }
107:
108: protected void bind() throws IOException {
109: // allocate an unbound server socket channel
110: serverSocket = new ServerSocket();
111: // set the port the server channel will listen to
112: //serverSocket.bind(new InetSocketAddress(getBind(), getTcpListenPort()));
113: bind(serverSocket, getPort(), getAutoBind());
114: }
115:
116: public void run() {
117: try {
118: listen();
119: } catch (Exception x) {
120: log.error("Unable to run replication listener.", x);
121: }
122: }
123:
124: public void listen() throws Exception {
125: if (doListen()) {
126: log.warn("ServerSocket already started");
127: return;
128: }
129: setListen(true);
130:
131: while (doListen()) {
132: Socket socket = null;
133: if (getTaskPool().available() < 1) {
134: if (log.isWarnEnabled())
135: log
136: .warn("All BIO server replication threads are busy, unable to handle more requests until a thread is freed up.");
137: }
138: BioReplicationTask task = (BioReplicationTask) getTaskPool()
139: .getRxTask();
140: if (task == null)
141: continue; //should never happen
142: try {
143: socket = serverSocket.accept();
144: } catch (Exception x) {
145: if (doListen())
146: throw x;
147: }
148: if (!doListen()) {
149: task.setDoRun(false);
150: task.serviceSocket(null, null);
151: getExecutor().execute(task);
152: break; //regular shutdown
153: }
154: if (socket == null)
155: continue;
156: socket.setReceiveBufferSize(getRxBufSize());
157: socket.setSendBufferSize(getRxBufSize());
158: socket.setTcpNoDelay(getTcpNoDelay());
159: socket.setKeepAlive(getSoKeepAlive());
160: socket.setOOBInline(getOoBInline());
161: socket.setReuseAddress(getSoReuseAddress());
162: socket.setSoLinger(getSoLingerOn(), getSoLingerTime());
163: socket.setTrafficClass(getSoTrafficClass());
164: socket.setSoTimeout(getTimeout());
165: ObjectReader reader = new ObjectReader(socket);
166: task.serviceSocket(socket, reader);
167: }//while
168: }
169:
170: }
|