001: /*
002: * ChainBuilder ESB
003: * Visual Enterprise Integration
004: *
005: * Copyright (C) 2007 Bostech Corporation
006: *
007: * This program is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU General Public License as published by the
009: * Free Software Foundation; either version 2 of the License, or (at your option)
010: * any later version.
011: *
012: * This program is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
014: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
015: * for more details.
016: *
017: * You should have received a copy of the GNU General Public License along with
018: * this program; if not, write to the Free Software Foundation, Inc.,
019: * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020: *
021: *
022: * $Id: CrHandler.java 9724 2007-10-24 14:33:29Z mpreston $
023: */
024: package com.bostechcorp.cbesb.custom.protocol;
025:
026: import javax.jbi.messaging.MessageExchange;
027:
028: import org.apache.commons.logging.Log;
029: import org.apache.commons.logging.LogFactory;
030:
031: import com.bostechcorp.cbesb.runtime.ccsl.lib.DumpNormalizedMessage;
032: import com.bostechcorp.cbesb.runtime.ccsl.lib.ITcpipContext;
033: import com.bostechcorp.cbesb.runtime.ccsl.lib.ITcpipHandler;
034:
035: public class CrHandler implements ITcpipHandler {
036: protected static final int INPUT_TIMEOUT = 3000;
037: protected static final int READ_TIMEOUT = 7000;
038: protected static final short INPUT_WANT_0D = 0;
039: protected static final short INPUT_WANT_0A = 1;
040:
041: protected Log logger = LogFactory.getLog(getClass());
042: protected ITcpipContext context;
043: protected StringBuffer inMessage;
044: protected boolean lastByteWasCr;
045: protected short inputState;
046: protected boolean gotInMessage;
047:
048: public String getDescription() {
049: return "TCPIP carriage return protocol handler";
050: }
051:
052: public void init(ITcpipContext ctx) {
053: context = ctx;
054: context.setSocketTimeout(0);
055: inMessage = new StringBuffer();
056: gotInMessage = false;
057: inputState = INPUT_WANT_0D;
058: }
059:
060: /*
061: * This handles received data in consumer mode.
062: */
063: public int gotReceiveData(byte[] bytes) throws Exception {
064: /*
065: * old stuff from previous version
066: int bytesConsumed = receiveStateMachine(bytes);
067: int startPosition=0;
068: if (lastByteWasCr && bytes[0] == (byte)0x0A) startPosition++;
069: lastByteWasCr = false;
070: //logger.info("CrHandler gotReceiveData");
071: if (bytes.length > startPosition) {
072: context.setSocketTimeout(INPUT_TIMEOUT);
073: int crPosition = findByte(bytes, 0, (byte)0x0D);
074: //logger.info("CrHandler crPosition");
075: if (crPosition < 0) {
076: bytesConsumed = bytes.length;
077: inMessage.append(new String(bytes, startPosition, bytes.length-startPosition));
078: } else {
079: lastByteWasCr = true;
080: bytesConsumed = crPosition+1;
081: inMessage.append(new String(bytes, 0, crPosition));
082: processInMessage();
083: }
084: } else bytesConsumed = bytes.length;
085: return bytesConsumed;
086: */
087: context.setSocketTimeout(READ_TIMEOUT);
088: int bytesConsumed = receiveStateMachine(bytes);
089: if (gotInMessage) {
090: // create the inbound exchange and possibly return the out message
091: byte[] inBytes = (new String(inMessage))
092: .getBytes("ISO8859-1");
093: logger.debug("got inbound message (" + inMessage.length()
094: + " bytes)\n"
095: + DumpNormalizedMessage.dumpBytesAsHex(inBytes));
096: byte[] outBytes = context.createInbound(inBytes);
097: if (outBytes != null)
098: sendMessage(outBytes);
099:
100: // reset the input state
101: inMessage = new StringBuffer();
102: inputState = INPUT_WANT_0D;
103: gotInMessage = false;
104: context.setSocketTimeout(0);
105: }
106: return bytesConsumed;
107: }
108:
109: /*
110: * Create a message on an inter-character timeout
111: */
112: public void gotReceiveTimeout() throws Exception {
113: logger.info("TCPIP handler timed out on input");
114: processInMessage();
115: }
116:
117: /*
118: * Create an inbound exchange and return the response if there is one
119: */
120: protected void processInMessage() throws Exception {
121: // create the inbound exchange and possibly return the out message
122: byte[] inBytes = (new String(inMessage)).getBytes("ISO8859-1");
123: logger.debug("CrHandler processInMessage ("
124: + inMessage.length() + " bytes)\n"
125: + DumpNormalizedMessage.dumpBytesAsHex(inBytes));
126: byte[] outBytes = context.createInbound(inBytes);
127: byte[] crlf = { (byte) 0x0D, (byte) 0x0A };
128: context.sendSocket(crlf);
129: if (outBytes != null) {
130: context.sendSocket(outBytes);
131: context.sendSocket(crlf);
132: }
133: // reset the input state
134: inMessage = new StringBuffer();
135: context.setSocketTimeout(0);
136: }
137:
138: /*
139: * No special processing for a socket error
140: */
141: public void gotSocketError(Exception e) throws Exception {
142: }
143:
144: /*
145: * Send an in-only message
146: */
147: public void processInOnlyBytes(byte[] bytes) throws Exception {
148: logger.debug("Cr Handler processInOnlyBytes " + bytes.length
149: + " bytes)\n");
150: sendMessage(bytes);
151: }
152:
153: /*
154: * Send a message and return a reply
155: */
156: public byte[] processInOutBytes(byte[] bytes) throws Exception {
157: byte[] returnBytes = null;
158: logger.debug("Cr Handler processInOutBytes (" + bytes.length
159: + " bytes)\n");
160: sendMessage(bytes);
161: byte[] received;
162: context.setSocketTimeout(READ_TIMEOUT);
163: while ((received = context.receiveSocket()) != null) {
164: for (int bytesLeft = 0; (bytesLeft = received.length) > 0;) {
165: int consumed = receiveStateMachine(received);
166: if (gotInMessage)
167: break;
168: if (consumed > 0) {
169: byte[] newBytes = new byte[bytesLeft - consumed];
170: for (int i = 0; i < newBytes.length; i++)
171: newBytes[i] = received[consumed++];
172: received = newBytes;
173: }
174: }
175: if (gotInMessage) {
176: returnBytes = (new String(inMessage))
177: .getBytes("ISO8859-1");
178: inMessage = new StringBuffer();
179: inputState = INPUT_WANT_0D;
180: gotInMessage = false;
181: context.setSocketTimeout(0);
182: break;
183: }
184: }
185: return returnBytes;
186: }
187:
188: /*
189: * I don't use these since i'm using the byte array methods
190: */
191: public void processInOnlyExchange(MessageExchange e)
192: throws Exception {
193: }
194:
195: public void processInOutExchange(MessageExchange e)
196: throws Exception {
197: }
198:
199: /*
200: * find the first occurrence of a value in a byte array
201: */
202: protected int findByte(byte[] bytes, int start, byte want) {
203: int result = -1;
204: if (bytes != null) {
205: for (int i = start; i < bytes.length; i++)
206: if (bytes[i] == want) {
207: result = i;
208: break;
209: }
210: }
211: return result;
212: }
213:
214: /*
215: * Add the CR wrapper and send a message
216: */
217: protected void sendMessage(byte[] bytes) throws Exception {
218: byte[] message = new byte[bytes.length + 3];
219: int i;
220: for (i = 0; i < bytes.length; i++)
221: message[i] = bytes[i];
222: message[++i] = 0x0D;
223: message[++i] = 0x0A;
224: context.sendSocket(message);
225: }
226:
227: /*
228: * Detect messages in a 0B-->1C0D MLLP wrapper
229: */
230: protected int receiveStateMachine(byte[] bytes) throws Exception {
231: logger.debug("Cr Handler receiveStateMachine (" + bytes.length
232: + " bytes)\n");
233: int bytesConsumed = bytes.length;
234: int position = 0;
235: if (bytes.length < 1)
236: throw new Exception("zero bytes received");
237: switch (inputState) {
238: case INPUT_WANT_0D:
239: // capture everything up to then next 0x0D
240: int pos_0d = findByte(bytes, position, (byte) 0x0D);
241: if (pos_0d < 0) {
242: int length = bytes.length - position;
243: if (length > 0)
244: inMessage.append(new String(bytes, position,
245: length, "ISO8859-1"));
246: break;
247: }
248: inMessage.append(new String(bytes, position, pos_0d
249: - position, "ISO8859-1"));
250: inputState = INPUT_WANT_0A;
251: position = pos_0d + 1;
252: // don't break here since we want to process the next state
253:
254: case INPUT_WANT_0A:
255: // if the next byte is 0D then we have a message, otherwise keep capturing
256: if (position >= bytes.length)
257: break;
258: if (bytes[position] == (byte) 0x0A) {
259: gotInMessage = true;
260: bytesConsumed = position + 1;
261: } else {
262: inMessage.appendCodePoint(0x0D);
263: inputState = INPUT_WANT_0D;
264: bytesConsumed = position;
265: }
266: break;
267: }
268: logger.debug("Cr Handler receiveStateMachine gotInMessage "
269: + gotInMessage);
270: return bytesConsumed;
271: }
272:
273: }
|