001: /*
002: * ChainBuilder ESB
003: * Visual Enterprise Integration
004: *
005: * Copyright (C) 2007 Bostech Corporation
006: *
007: * This program is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU General Public License as published by the
009: * Free Software Foundation; either version 2 of the License, or (at your option)
010: * any later version.
011: *
012: * This program is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
014: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
015: * for more details.
016: *
017: * You should have received a copy of the GNU General Public License along with
018: * this program; if not, write to the Free Software Foundation, Inc.,
019: * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020: *
021: *
022: * $Id: MllpHandler.java 6431 2007-03-28 17:59:14Z tvolle $
023: */
024: package com.bostechcorp.cbesb.custom.protocol;
025:
026: import javax.jbi.messaging.MessageExchange;
027:
028: import org.apache.commons.logging.Log;
029: import org.apache.commons.logging.LogFactory;
030:
031: import com.bostechcorp.cbesb.runtime.ccsl.lib.DumpNormalizedMessage;
032: import com.bostechcorp.cbesb.runtime.ccsl.lib.ITcpipContext;
033: import com.bostechcorp.cbesb.runtime.ccsl.lib.ITcpipHandler;
034:
035: public class MllpHandler implements ITcpipHandler {
036: protected static final short INPUT_WANT_0B = 0;
037: protected static final short INPUT_WANT_1C = 1;
038: protected static final short INPUT_WANT_0D = 2;
039: protected static final int READ_TIMEOUT = 7000;
040:
041: protected Log logger = LogFactory.getLog(getClass());
042: protected ITcpipContext context;
043: protected StringBuffer inMessage;
044: protected short inputState;
045: protected boolean gotInMessage;
046:
047: public String getDescription() {
048: return "TCPIP MLLP protocol handler";
049: }
050:
051: public void init(ITcpipContext ctx) {
052: context = ctx;
053: context.setSocketTimeout(0);
054: inMessage = new StringBuffer();
055: gotInMessage = false;
056: inputState = INPUT_WANT_0B;
057: }
058:
059: /*
060: * This handles received data in consumer mode.
061: */
062: public int gotReceiveData(byte[] bytes) throws Exception {
063: context.setSocketTimeout(READ_TIMEOUT);
064: int bytesConsumed = receiveStateMachine(bytes);
065: if (gotInMessage) {
066: // create the inbound exchange and possibly return the out message
067: byte[] inBytes = (new String(inMessage))
068: .getBytes("ISO8859-1");
069: logger.debug("got inbound message (" + inMessage.length()
070: + " bytes)\n"
071: + DumpNormalizedMessage.dumpBytesAsHex(inBytes));
072: byte[] outBytes = context.createInbound(inBytes);
073: if (outBytes != null)
074: sendMessage(outBytes);
075:
076: // reset the input state
077: inMessage = new StringBuffer();
078: inputState = INPUT_WANT_0B;
079: gotInMessage = false;
080: context.setSocketTimeout(0);
081: }
082: return bytesConsumed;
083: }
084:
085: /*
086: * Flush input state on a timeout
087: */
088: public void gotReceiveTimeout() throws Exception {
089: logger.info("TCPIP handler timed out on input");
090: inMessage = new StringBuffer();
091: inputState = INPUT_WANT_0B;
092: context.setSocketTimeout(0);
093: }
094:
095: /*
096: * No special processing for a socket error
097: */
098: public void gotSocketError(Exception e) throws Exception {
099: }
100:
101: /*
102: * Send an in-only message
103: */
104: public void processInOnlyBytes(byte[] bytes) throws Exception {
105: sendMessage(bytes);
106: }
107:
108: /*
109: * Send a message and return a reply
110: */
111: public byte[] processInOutBytes(byte[] bytes) throws Exception {
112: byte[] returnBytes = null;
113: sendMessage(bytes);
114: byte[] received;
115: context.setSocketTimeout(READ_TIMEOUT);
116: while ((received = context.receiveSocket()) != null) {
117: for (int bytesLeft = 0; (bytesLeft = received.length) > 0;) {
118: int consumed = receiveStateMachine(received);
119: if (gotInMessage)
120: break;
121: if (consumed > 0) {
122: byte[] newBytes = new byte[bytesLeft - consumed];
123: for (int i = 0; i < newBytes.length; i++)
124: newBytes[i] = received[consumed++];
125: received = newBytes;
126: }
127: }
128: if (gotInMessage) {
129: returnBytes = (new String(inMessage))
130: .getBytes("ISO8859-1");
131: inMessage = new StringBuffer();
132: inputState = INPUT_WANT_0B;
133: gotInMessage = false;
134: context.setSocketTimeout(0);
135: break;
136: }
137: }
138: return returnBytes;
139: }
140:
141: /*
142: * I don't use these since i'm using the byte array methods
143: */
144: public void processInOnlyExchange(MessageExchange e)
145: throws Exception {
146: }
147:
148: public void processInOutExchange(MessageExchange e)
149: throws Exception {
150: }
151:
152: /*
153: * Add the MLLP wrapper and send a message
154: */
155: protected void sendMessage(byte[] bytes) throws Exception {
156: byte[] message = new byte[bytes.length + 3];
157: message[0] = 0x0B;
158: int i;
159: for (i = 0; i < bytes.length; i++)
160: message[i + 1] = bytes[i];
161: message[++i] = 0x1C;
162: message[++i] = 0x0D;
163: context.sendSocket(message);
164: }
165:
166: /*
167: * Detect messages in a 0B-->1C0D MLLP wrapper
168: */
169: protected int receiveStateMachine(byte[] bytes) throws Exception {
170: int bytesConsumed = bytes.length;
171: int position = 0;
172: if (bytes.length < 1)
173: throw new Exception("zero bytes received");
174: switch (inputState) {
175: case INPUT_WANT_0B:
176: // ignore everything up to the first 0x0B
177: int pos_0b = findByte(bytes, position, (byte) 0x0B);
178: if (pos_0b < 0)
179: break;
180: position = pos_0b + 1;
181: inputState = INPUT_WANT_1C;
182: context.setSocketTimeout(READ_TIMEOUT);
183: // don't break here since we want to process the next state
184:
185: case INPUT_WANT_1C:
186: // capture everything up to then next 0x1C
187: int pos_1c = findByte(bytes, position, (byte) 0x1C);
188: if (pos_1c < 0) {
189: int length = bytes.length - position;
190: if (length > 0)
191: inMessage.append(new String(bytes, position,
192: length, "ISO8859-1"));
193: break;
194: }
195: inMessage.append(new String(bytes, position, pos_1c
196: - position, "ISO8859-1"));
197: inputState = INPUT_WANT_0D;
198: position = pos_1c + 1;
199: // don't break here since we want to process the next state
200:
201: case INPUT_WANT_0D:
202: // if the next byte is 0D then we have a message, otherwise keep capturing
203: if (position >= bytes.length)
204: break;
205: if (bytes[position] == (byte) 0x0D) {
206: gotInMessage = true;
207: bytesConsumed = position + 1;
208: } else {
209: inMessage.appendCodePoint(0x1C);
210: inputState = INPUT_WANT_1C;
211: bytesConsumed = position;
212: }
213: break;
214: }
215: return bytesConsumed;
216: }
217:
218: /*
219: * find the first occurrence of a value in a byte array
220: */
221: protected int findByte(byte[] bytes, int start, byte want) {
222: int result = -1;
223: if (bytes != null) {
224: for (int i = start; i < bytes.length; i++)
225: if (bytes[i] == want) {
226: result = i;
227: break;
228: }
229: }
230: return result;
231: }
232:
233: }
|