001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one or more
003: * contributor license agreements. See the NOTICE file distributed with
004: * this work for additional information regarding copyright ownership.
005: * The ASF licenses this file to You under the Apache License, Version 2.0
006: * (the "License"); you may not use this file except in compliance with
007: * the License. You may obtain a copy of the License at
008: *
009: * http://www.apache.org/licenses/LICENSE-2.0
010: *
011: * Unless required by applicable law or agreed to in writing, software
012: * distributed under the License is distributed on an "AS IS" BASIS,
013: * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014: * See the License for the specific language governing permissions and
015: * limitations under the License.
016: */
017:
018: package org.apache.catalina.tribes.transport.nio;
019:
020: import java.io.IOException;
021: import java.net.InetSocketAddress;
022: import java.nio.ByteBuffer;
023: import java.nio.channels.SelectionKey;
024: import java.nio.channels.Selector;
025: import java.nio.channels.SocketChannel;
026: import java.util.Arrays;
027:
028: import org.apache.catalina.tribes.io.XByteBuffer;
029: import org.apache.catalina.tribes.transport.AbstractSender;
030: import org.apache.catalina.tribes.transport.DataSender;
031: import org.apache.catalina.tribes.RemoteProcessException;
032: import java.io.EOFException;
033: import java.net.*;
034:
035: /**
036: * This class is NOT thread safe and should never be used with more than one thread at a time
037: *
038: * This is a state machine, handled by the process method
039: * States are:
040: * - NOT_CONNECTED -> connect() -> CONNECTED
041: * - CONNECTED -> setMessage() -> READY TO WRITE
042: * - READY_TO_WRITE -> write() -> READY TO WRITE | READY TO READ
043: * - READY_TO_READ -> read() -> READY_TO_READ | TRANSFER_COMPLETE
044: * - TRANSFER_COMPLETE -> CONNECTED
045: *
046: * @author Filip Hanik
047: * @version 1.0
048: */
049: public class NioSender extends AbstractSender implements DataSender {
050:
051: protected static org.apache.juli.logging.Log log = org.apache.juli.logging.LogFactory
052: .getLog(NioSender.class);
053:
054: protected Selector selector;
055: protected SocketChannel socketChannel;
056:
057: /*
058: * STATE VARIABLES *
059: */
060: protected ByteBuffer readbuf = null;
061: protected ByteBuffer writebuf = null;
062: protected byte[] current = null;
063: protected XByteBuffer ackbuf = new XByteBuffer(128, true);
064: protected int remaining = 0;
065: protected boolean complete;
066:
067: protected boolean connecting = false;
068:
069: public NioSender() {
070: super ();
071:
072: }
073:
074: /**
075: * State machine to send data
076: * @param key SelectionKey
077: * @return boolean
078: * @throws IOException
079: */
080: public boolean process(SelectionKey key, boolean waitForAck)
081: throws IOException {
082: int ops = key.readyOps();
083: key.interestOps(key.interestOps() & ~ops);
084: //in case disconnect has been called
085: if ((!isConnected()) && (!connecting))
086: throw new IOException(
087: "Sender has been disconnected, can't selection key.");
088: if (!key.isValid())
089: throw new IOException(
090: "Key is not valid, it must have been cancelled.");
091: if (key.isConnectable()) {
092: if (socketChannel.finishConnect()) {
093: completeConnect();
094: if (current != null)
095: key.interestOps(key.interestOps()
096: | SelectionKey.OP_WRITE);
097: return false;
098: } else {
099: //wait for the connection to finish
100: key.interestOps(key.interestOps()
101: | SelectionKey.OP_CONNECT);
102: return false;
103: }//end if
104: } else if (key.isWritable()) {
105: boolean writecomplete = write(key);
106: if (writecomplete) {
107: //we are completed, should we read an ack?
108: if (waitForAck) {
109: //register to read the ack
110: key.interestOps(key.interestOps()
111: | SelectionKey.OP_READ);
112: } else {
113: //if not, we are ready, setMessage will reregister us for another write interest
114: //do a health check, we have no way of verify a disconnected
115: //socket since we don't register for OP_READ on waitForAck=false
116: read(key);//this causes overhead
117: setRequestCount(getRequestCount() + 1);
118: return true;
119: }
120: } else {
121: //we are not complete, lets write some more
122: key.interestOps(key.interestOps()
123: | SelectionKey.OP_WRITE);
124: }//end if
125: } else if (key.isReadable()) {
126: boolean readcomplete = read(key);
127: if (readcomplete) {
128: setRequestCount(getRequestCount() + 1);
129: return true;
130: } else {
131: key.interestOps(key.interestOps()
132: | SelectionKey.OP_READ);
133: }//end if
134: } else {
135: //unknown state, should never happen
136: log.warn("Data is in unknown state. readyOps=" + ops);
137: throw new IOException("Data is in unknown state. readyOps="
138: + ops);
139: }//end if
140: return false;
141: }
142:
143: private void completeConnect() throws SocketException {
144: //we connected, register ourselves for writing
145: setConnected(true);
146: connecting = false;
147: setRequestCount(0);
148: setConnectTime(System.currentTimeMillis());
149: socketChannel.socket().setSendBufferSize(getTxBufSize());
150: socketChannel.socket().setReceiveBufferSize(getRxBufSize());
151: socketChannel.socket().setSoTimeout((int) getTimeout());
152: socketChannel.socket().setSoLinger(getSoLingerOn(),
153: getSoLingerOn() ? getSoLingerTime() : 0);
154: socketChannel.socket().setTcpNoDelay(getTcpNoDelay());
155: socketChannel.socket().setKeepAlive(getSoKeepAlive());
156: socketChannel.socket().setReuseAddress(getSoReuseAddress());
157: socketChannel.socket().setOOBInline(getOoBInline());
158: socketChannel.socket().setSoLinger(getSoLingerOn(),
159: getSoLingerTime());
160: socketChannel.socket().setTrafficClass(getSoTrafficClass());
161: }
162:
163: protected boolean read(SelectionKey key) throws IOException {
164: //if there is no message here, we are done
165: if (current == null)
166: return true;
167: int read = socketChannel.read(readbuf);
168: //end of stream
169: if (read == -1)
170: throw new IOException(
171: "Unable to receive an ack message. EOF on socket channel has been reached.");
172: //no data read
173: else if (read == 0)
174: return false;
175: readbuf.flip();
176: ackbuf.append(readbuf, read);
177: readbuf.clear();
178: if (ackbuf.doesPackageExist()) {
179: byte[] ackcmd = ackbuf.extractDataPackage(true).getBytes();
180: boolean ack = Arrays
181: .equals(
182: ackcmd,
183: org.apache.catalina.tribes.transport.Constants.ACK_DATA);
184: boolean fack = Arrays
185: .equals(
186: ackcmd,
187: org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA);
188: if (fack && getThrowOnFailedAck())
189: throw new RemoteProcessException(
190: "Received a failed ack:org.apache.catalina.tribes.transport.Constants.FAIL_ACK_DATA");
191: return ack || fack;
192: } else {
193: return false;
194: }
195: }
196:
197: protected boolean write(SelectionKey key) throws IOException {
198: if ((!isConnected()) || (this .socketChannel == null)) {
199: throw new IOException(
200: "NioSender is not connected, this should not occur.");
201: }
202: if (current != null) {
203: if (remaining > 0) {
204: //weve written everything, or we are starting a new package
205: //protect against buffer overwrite
206: int byteswritten = socketChannel.write(writebuf);
207: if (byteswritten == -1)
208: throw new EOFException();
209: remaining -= byteswritten;
210: //if the entire message was written from the buffer
211: //reset the position counter
212: if (remaining < 0) {
213: remaining = 0;
214: }
215: }
216: return (remaining == 0);
217: }
218: //no message to send, we can consider that complete
219: return true;
220: }
221:
222: /**
223: * connect - blocking in this operation
224: *
225: * @throws IOException
226: * @todo Implement this org.apache.catalina.tribes.transport.IDataSender method
227: */
228: public synchronized void connect() throws IOException {
229: if (connecting)
230: return;
231: connecting = true;
232: if (isConnected())
233: throw new IOException(
234: "NioSender is already in connected state.");
235: if (readbuf == null) {
236: readbuf = getReadBuffer();
237: } else {
238: readbuf.clear();
239: }
240: if (writebuf == null) {
241: writebuf = getWriteBuffer();
242: } else {
243: writebuf.clear();
244: }
245:
246: InetSocketAddress addr = new InetSocketAddress(getAddress(),
247: getPort());
248: if (socketChannel != null)
249: throw new IOException(
250: "Socket channel has already been established. Connection might be in progress.");
251: socketChannel = SocketChannel.open();
252: socketChannel.configureBlocking(false);
253: if (socketChannel.connect(addr)) {
254: completeConnect();
255: socketChannel.register(getSelector(),
256: SelectionKey.OP_WRITE, this );
257: } else {
258: socketChannel.register(getSelector(),
259: SelectionKey.OP_CONNECT, this );
260: }
261: }
262:
263: /**
264: * disconnect
265: *
266: * @todo Implement this org.apache.catalina.tribes.transport.IDataSender method
267: */
268: public void disconnect() {
269: try {
270: connecting = false;
271: setConnected(false);
272: if (socketChannel != null) {
273: try {
274: try {
275: socketChannel.socket().close();
276: } catch (Exception x) {
277: }
278: //error free close, all the way
279: //try {socket.shutdownOutput();}catch ( Exception x){}
280: //try {socket.shutdownInput();}catch ( Exception x){}
281: //try {socket.close();}catch ( Exception x){}
282: try {
283: socketChannel.close();
284: } catch (Exception x) {
285: }
286: } finally {
287: socketChannel = null;
288: }
289: }
290: } catch (Exception x) {
291: log.error("Unable to disconnect NioSender. msg="
292: + x.getMessage());
293: if (log.isDebugEnabled())
294: log.debug("Unable to disconnect NioSender. msg="
295: + x.getMessage(), x);
296: } finally {
297: }
298:
299: }
300:
301: public void reset() {
302: if (isConnected() && readbuf == null) {
303: readbuf = getReadBuffer();
304: }
305: if (readbuf != null)
306: readbuf.clear();
307: if (writebuf != null)
308: writebuf.clear();
309: current = null;
310: ackbuf.clear();
311: remaining = 0;
312: complete = false;
313: setAttempt(0);
314: setRequestCount(0);
315: setConnectTime(-1);
316: }
317:
318: private ByteBuffer getReadBuffer() {
319: return getBuffer(getRxBufSize());
320: }
321:
322: private ByteBuffer getWriteBuffer() {
323: return getBuffer(getTxBufSize());
324: }
325:
326: private ByteBuffer getBuffer(int size) {
327: return (getDirectBuffer() ? ByteBuffer.allocateDirect(size)
328: : ByteBuffer.allocate(size));
329: }
330:
331: /**
332: * sendMessage
333: *
334: * @param data ChannelMessage
335: * @throws IOException
336: * @todo Implement this org.apache.catalina.tribes.transport.IDataSender method
337: */
338: public synchronized void setMessage(byte[] data) throws IOException {
339: setMessage(data, 0, data.length);
340: }
341:
342: public synchronized void setMessage(byte[] data, int offset,
343: int length) throws IOException {
344: if (data != null) {
345: current = data;
346: remaining = length;
347: ackbuf.clear();
348: if (writebuf != null)
349: writebuf.clear();
350: else
351: writebuf = getBuffer(length);
352: if (writebuf.capacity() < length)
353: writebuf = getBuffer(length);
354: writebuf.put(data, offset, length);
355: //writebuf.rewind();
356: //set the limit so that we don't write non wanted data
357: //writebuf.limit(length);
358: writebuf.flip();
359: if (isConnected()) {
360: socketChannel.register(getSelector(),
361: SelectionKey.OP_WRITE, this );
362: }
363: }
364: }
365:
366: public byte[] getMessage() {
367: return current;
368: }
369:
370: public boolean isComplete() {
371: return complete;
372: }
373:
374: public Selector getSelector() {
375: return selector;
376: }
377:
378: public void setSelector(Selector selector) {
379: this .selector = selector;
380: }
381:
382: public void setComplete(boolean complete) {
383: this.complete = complete;
384: }
385: }
|