001: /*
002: * ChainBuilder ESB
003: * Visual Enterprise Integration
004: *
005: * Copyright (C) 2007 Bostech Corporation
006: *
007: * This program is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU General Public License as published by the
009: * Free Software Foundation; either version 2 of the License, or (at your option)
010: * any later version.
011: *
012: * This program is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
014: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
015: * for more details.
016: *
017: * You should have received a copy of the GNU General Public License along with
018: * this program; if not, write to the Free Software Foundation, Inc.,
019: * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020: *
021: *
022: * $Id: FourByteLengthEncodedHandler.java 9474 2007-10-09 15:24:22Z elu $
023: */
024: package com.bostechcorp.cbesb.custom.protocol;
025:
026: import javax.jbi.messaging.MessageExchange;
027:
028: import org.apache.commons.logging.Log;
029: import org.apache.commons.logging.LogFactory;
030:
031: import com.bostechcorp.cbesb.runtime.ccsl.lib.DumpNormalizedMessage;
032: import com.bostechcorp.cbesb.runtime.ccsl.lib.ITcpipContext;
033: import com.bostechcorp.cbesb.runtime.ccsl.lib.ITcpipHandler;
034:
035: public class FourByteLengthEncodedHandler implements ITcpipHandler {
036: protected static final short INPUT_WANT_LENGTH = 0;
037: protected static final short INPUT_WANT_DATA = 1;
038: protected static final short LENGTH_BYTES = 4;
039: protected static final int INPUT_TIMEOUT = 3000;
040:
041: protected Log logger = LogFactory.getLog(getClass());
042: protected ITcpipContext context;
043: protected StringBuffer inMessage;
044: protected short inputState;
045: protected boolean gotInMessage;
046: protected int bytesLeft;
047: protected byte[] lengthBytes = new byte[LENGTH_BYTES];
048:
049: public String getDescription() {
050: return "TCPIP four byte length encoded protocol handler";
051: }
052:
053: public void init(ITcpipContext ctx) {
054: context = ctx;
055: context.setSocketTimeout(0);
056: inMessage = new StringBuffer();
057: gotInMessage = false;
058: bytesLeft = LENGTH_BYTES;
059: inputState = INPUT_WANT_LENGTH;
060: }
061:
062: /*
063: * This handles received data in consumer mode.
064: */
065: public int gotReceiveData(byte[] bytes) throws Exception {
066: context.setSocketTimeout(INPUT_TIMEOUT);
067: int bytesConsumed = receiveStateMachine(bytes);
068: if (gotInMessage) {
069: // create the inbound exchange and possibly return the out message
070: byte[] inBytes = (new String(inMessage))
071: .getBytes("ISO8859-1");
072: logger.debug("got inbound message (" + inMessage.length()
073: + " bytes)\n"
074: + DumpNormalizedMessage.dumpBytesAsHex(inBytes));
075: byte[] outBytes = context.createInbound(inBytes);
076: if (outBytes != null)
077: sendMessage(outBytes);
078:
079: // reset the input state
080: inMessage = new StringBuffer();
081: inputState = INPUT_WANT_LENGTH;
082: bytesLeft = LENGTH_BYTES;
083: gotInMessage = false;
084: context.setSocketTimeout(0);
085: }
086: return bytesConsumed;
087: }
088:
089: /*
090: * Flush input state on a timeout
091: */
092: public void gotReceiveTimeout() throws Exception {
093: logger.info("TCPIP handler timed out on input");
094: inMessage = new StringBuffer();
095: inputState = INPUT_WANT_LENGTH;
096: bytesLeft = LENGTH_BYTES;
097: context.setSocketTimeout(0);
098: }
099:
100: /*
101: * No special processing for a socket error
102: */
103: public void gotSocketError(Exception e) throws Exception {
104: }
105:
106: /*
107: * Send an in-only message
108: */
109: public void processInOnlyBytes(byte[] bytes) throws Exception {
110: sendMessage(bytes);
111: }
112:
113: /*
114: * Send a message and return a reply
115: */
116: public byte[] processInOutBytes(byte[] bytes) throws Exception {
117: byte[] returnBytes = null;
118: sendMessage(bytes);
119: byte[] received;
120: context.setSocketTimeout(INPUT_TIMEOUT);
121: while ((received = context.receiveSocket()) != null) {
122: for (int bytesLeft = 0; (bytesLeft = received.length) > 0;) {
123: int consumed = receiveStateMachine(received);
124: if (gotInMessage)
125: break;
126: if (consumed > 0) {
127: byte[] newBytes = new byte[bytesLeft - consumed];
128: for (int i = 0; i < newBytes.length; i++)
129: newBytes[i] = received[consumed++];
130: received = newBytes;
131: }
132: }
133: if (gotInMessage) {
134: returnBytes = (new String(inMessage))
135: .getBytes("ISO8859-1");
136: inMessage = new StringBuffer();
137: inputState = INPUT_WANT_LENGTH;
138: gotInMessage = false;
139: bytesLeft = 4;
140: context.setSocketTimeout(0);
141: break;
142: }
143: }
144: return returnBytes;
145: }
146:
147: /*
148: * I don't use these since i'm using the byte array methods
149: */
150: public void processInOnlyExchange(MessageExchange e)
151: throws Exception {
152: }
153:
154: public void processInOutExchange(MessageExchange e)
155: throws Exception {
156: }
157:
158: /*
159: * Add the byte count and send a message
160: */
161: protected void sendMessage(byte[] bytes) throws Exception {
162: byte[] message = new byte[bytes.length + LENGTH_BYTES];
163: int length = bytes.length + LENGTH_BYTES;
164: for (int i = LENGTH_BYTES - 1; i >= 0; i--) {
165: message[i] = (byte) (length & 0xFF);
166: length >>= 8;
167: }
168: int i;
169: for (i = 0; i < bytes.length; i++)
170: message[i + LENGTH_BYTES] = bytes[i];
171: context.sendSocket(message);
172: }
173:
174: /*
175: * Detect messages
176: */
177: protected int receiveStateMachine(byte[] bytes) throws Exception {
178: int bytesConsumed = bytes.length;
179: int position = 0;
180: if (bytes.length < 1)
181: throw new Exception("zero bytes received");
182: switch (inputState) {
183: case INPUT_WANT_LENGTH:
184: while (bytesLeft > 0 && position < bytes.length) {
185: lengthBytes[--bytesLeft] = bytes[position++];
186: }
187:
188: if (bytesLeft > 0) {
189: bytesConsumed = position;
190: break;
191: }
192: inputState = INPUT_WANT_DATA;
193: bytesLeft = 0;
194: for (int i = LENGTH_BYTES - 1; i >= 0; i--)
195: bytesLeft = 256 * bytesLeft + lengthBytes[i];
196: bytesLeft -= LENGTH_BYTES;
197: // don't break here since we want to process the next state
198:
199: case INPUT_WANT_DATA:
200: int length = bytes.length - position;
201: if (length < bytesLeft) {
202: inMessage.append(new String(bytes, position, length,
203: "ISO8859-1"));
204: bytesLeft -= length;
205: } else {
206: inMessage.append(new String(bytes, position, bytesLeft,
207: "ISO8859-1"));
208: position += bytesLeft;
209: bytesConsumed = position;
210: gotInMessage = true;
211: }
212: break;
213: }
214: return bytesConsumed;
215: }
216:
217: }
|