001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.catalina.tribes.transport.bio;
019:
020: import java.io.IOException;
021: import java.io.InputStream;
022: import java.io.OutputStream;
023: import java.net.InetSocketAddress;
024: import java.net.Socket;
025: import java.util.Arrays;
026:
027: import org.apache.catalina.tribes.RemoteProcessException;
028: import org.apache.catalina.tribes.io.XByteBuffer;
029: import org.apache.catalina.tribes.transport.AbstractSender;
030: import org.apache.catalina.tribes.transport.Constants;
031: import org.apache.catalina.tribes.transport.DataSender;
032: import org.apache.catalina.tribes.transport.SenderState;
033: import org.apache.catalina.tribes.util.StringManager;
034:
035: /**
036: * Send cluster messages with only one socket. Ack and keep Alive Handling is
037: * supported
038: *
039: * @author Peter Rossbach
040: * @author Filip Hanik
041: * @version $Revision: 532608 $ $Date: 2007-04-26 06:58:20 +0200 (jeu., 26 avr. 2007) $
042: * @since 5.5.16
043: */
044: public class BioSender extends AbstractSender implements DataSender {
045:
046: private static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
047: .getLog(BioSender.class);
048:
049: /**
050: * The string manager for this package.
051: */
052: protected static StringManager sm = StringManager
053: .getManager(Constants.Package);
054:
055: // ----------------------------------------------------- Instance Variables
056:
057: /**
058: * The descriptive information about this implementation.
059: */
060: private static final String info = "DataSender/3.0";
061:
062: /**
063: * current sender socket
064: */
065: private Socket socket = null;
066: private OutputStream soOut = null;
067: private InputStream soIn = null;
068:
069: protected XByteBuffer ackbuf = new XByteBuffer(
070: Constants.ACK_COMMAND.length, true);
071:
072: // ------------------------------------------------------------- Constructor
073:
074: public BioSender() {
075: }
076:
077: // ------------------------------------------------------------- Properties
078:
079: /**
080: * Return descriptive information about this implementation and the
081: * corresponding version number, in the format
082: * <code><description>/<version></code>.
083: */
084: public String getInfo() {
085: return (info);
086: }
087:
088: // --------------------------------------------------------- Public Methods
089:
090: /**
091: * Connect other cluster member receiver
092: * @see org.apache.catalina.tribes.transport.IDataSender#connect()
093: */
094: public void connect() throws IOException {
095: openSocket();
096: }
097:
098: /**
099: * disconnect and close socket
100: *
101: * @see IDataSender#disconnect()
102: */
103: public void disconnect() {
104: boolean connect = isConnected();
105: closeSocket();
106: if (connect) {
107: if (log.isDebugEnabled())
108: log.debug(sm.getString("IDataSender.disconnect",
109: getAddress().getHostAddress(), new Integer(
110: getPort()), new Long(0)));
111: }
112:
113: }
114:
115: /**
116: * Send message
117: *
118: * @see org.apache.catalina.tribes.transport.IDataSender#sendMessage(,
119: * ChannelMessage)
120: */
121: public void sendMessage(byte[] data, boolean waitForAck)
122: throws IOException {
123: IOException exception = null;
124: setAttempt(0);
125: try {
126: // first try with existing connection
127: pushMessage(data, false, waitForAck);
128: } catch (IOException x) {
129: SenderState.getSenderState(getDestination()).setSuspect();
130: exception = x;
131: if (log.isTraceEnabled())
132: log.trace(sm.getString("IDataSender.send.again",
133: getAddress().getHostAddress(), new Integer(
134: getPort())), x);
135: while (getAttempt() < getMaxRetryAttempts()) {
136: try {
137: setAttempt(getAttempt() + 1);
138: // second try with fresh connection
139: pushMessage(data, true, waitForAck);
140: exception = null;
141: } catch (IOException xx) {
142: exception = xx;
143: closeSocket();
144: }
145: }
146: } finally {
147: setRequestCount(getRequestCount() + 1);
148: keepalive();
149: if (exception != null)
150: throw exception;
151: }
152: }
153:
154: /**
155: * Name of this SockerSender
156: */
157: public String toString() {
158: StringBuffer buf = new StringBuffer("DataSender[(");
159: buf.append(super .toString()).append(")");
160: buf.append(getAddress()).append(":").append(getPort()).append(
161: "]");
162: return buf.toString();
163: }
164:
165: // --------------------------------------------------------- Protected Methods
166:
167: /**
168: * open real socket and set time out when waitForAck is enabled
169: * is socket open return directly
170: */
171: protected void openSocket() throws IOException {
172: if (isConnected())
173: return;
174: try {
175: socket = new Socket();
176: InetSocketAddress sockaddr = new InetSocketAddress(
177: getAddress(), getPort());
178: socket.connect(sockaddr, (int) getTimeout());
179: socket.setSendBufferSize(getTxBufSize());
180: socket.setReceiveBufferSize(getRxBufSize());
181: socket.setSoTimeout((int) getTimeout());
182: socket.setTcpNoDelay(getTcpNoDelay());
183: socket.setKeepAlive(getSoKeepAlive());
184: socket.setReuseAddress(getSoReuseAddress());
185: socket.setOOBInline(getOoBInline());
186: socket.setSoLinger(getSoLingerOn(), getSoLingerTime());
187: socket.setTrafficClass(getSoTrafficClass());
188: setConnected(true);
189: soOut = socket.getOutputStream();
190: soIn = socket.getInputStream();
191: setRequestCount(0);
192: setConnectTime(System.currentTimeMillis());
193: if (log.isDebugEnabled())
194: log.debug(sm.getString("IDataSender.openSocket",
195: getAddress().getHostAddress(), new Integer(
196: getPort()), new Long(0)));
197: } catch (IOException ex1) {
198: SenderState.getSenderState(getDestination()).setSuspect();
199: if (log.isDebugEnabled())
200: log.debug(sm.getString(
201: "IDataSender.openSocket.failure", getAddress()
202: .getHostAddress(), new Integer(
203: getPort()), new Long(0)), ex1);
204: throw (ex1);
205: }
206:
207: }
208:
209: /**
210: * close socket
211: *
212: * @see DataSender#disconnect()
213: * @see DataSender#closeSocket()
214: */
215: protected void closeSocket() {
216: if (isConnected()) {
217: if (socket != null) {
218: try {
219: socket.close();
220: } catch (IOException x) {
221: } finally {
222: socket = null;
223: soOut = null;
224: soIn = null;
225: }
226: }
227: setRequestCount(0);
228: setConnected(false);
229: if (log.isDebugEnabled())
230: log.debug(sm.getString("IDataSender.closeSocket",
231: getAddress().getHostAddress(), new Integer(
232: getPort()), new Long(0)));
233: }
234: }
235:
236: /**
237: * Push messages with only one socket at a time
238: * Wait for ack is needed and make auto retry when write message is failed.
239: * After sending error close and reopen socket again.
240: *
241: * After successfull sending update stats
242: *
243: * WARNING: Subclasses must be very carefull that only one thread call this pushMessage at once!!!
244: *
245: * @see #closeSocket()
246: * @see #openSocket()
247: * @see #writeData(ChannelMessage)
248: *
249: * @param data
250: * data to send
251: * @since 5.5.10
252: */
253:
254: protected void pushMessage(byte[] data, boolean reconnect,
255: boolean waitForAck) throws IOException {
256: keepalive();
257: if (reconnect)
258: closeSocket();
259: if (!isConnected())
260: openSocket();
261: soOut.write(data);
262: soOut.flush();
263: if (waitForAck)
264: waitForAck();
265: SenderState.getSenderState(getDestination()).setReady();
266:
267: }
268:
269: /**
270: * Wait for Acknowledgement from other server
271: * FIXME Please, not wait only for three charcters, better control that the wait ack message is correct.
272: * @param timeout
273: * @throws java.io.IOException
274: * @throws java.net.SocketTimeoutException
275: */
276: protected void waitForAck() throws java.io.IOException {
277: try {
278: boolean ackReceived = false;
279: boolean failAckReceived = false;
280: ackbuf.clear();
281: int bytesRead = 0;
282: int i = soIn.read();
283: while ((i != -1)
284: && (bytesRead < Constants.ACK_COMMAND.length)) {
285: bytesRead++;
286: byte d = (byte) i;
287: ackbuf.append(d);
288: if (ackbuf.doesPackageExist()) {
289: byte[] ackcmd = ackbuf.extractDataPackage(true)
290: .getBytes();
291: ackReceived = Arrays
292: .equals(
293: ackcmd,
294: org.apache.catalina.tribes.transport.Constants.ACK_DATA);
295: failAckReceived = Arrays
296: .equals(
297: ackcmd,
298: org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA);
299: ackReceived = ackReceived || failAckReceived;
300: break;
301: }
302: i = soIn.read();
303: }
304: if (!ackReceived) {
305: if (i == -1)
306: throw new IOException(sm.getString(
307: "IDataSender.ack.eof", getAddress(),
308: new Integer(socket.getLocalPort())));
309: else
310: throw new IOException(sm.getString(
311: "IDataSender.ack.wrong", getAddress(),
312: new Integer(socket.getLocalPort())));
313: } else if (failAckReceived && getThrowOnFailedAck()) {
314: throw new RemoteProcessException(
315: "Received a failed ack:org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA");
316: }
317: } catch (IOException x) {
318: String errmsg = sm.getString("IDataSender.ack.missing",
319: getAddress(), new Integer(socket.getLocalPort()),
320: new Long(getTimeout()));
321: if (SenderState.getSenderState(getDestination()).isReady()) {
322: SenderState.getSenderState(getDestination())
323: .setSuspect();
324: if (log.isWarnEnabled())
325: log.warn(errmsg, x);
326: } else {
327: if (log.isDebugEnabled())
328: log.debug(errmsg, x);
329: }
330: throw x;
331: } finally {
332: ackbuf.clear();
333: }
334: }
335: }
|