001: /*
002: * ChainBuilder ESB
003: * Visual Enterprise Integration
004: *
005: * Copyright (C) 2007 Bostech Corporation
006: *
007: * This program is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU General Public License as published by the
009: * Free Software Foundation; either version 2 of the License, or (at your option)
010: * any later version.
011: *
012: * This program is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
014: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
015: * for more details.
016: *
017: * You should have received a copy of the GNU General Public License along with
018: * this program; if not, write to the Free Software Foundation, Inc.,
019: * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
020: *
021: *
022: * $Id: TcpipSocket.java 12040 2008-02-21 07:49:25Z lzheng $
023: */
024: package com.bostechcorp.cbesb.runtime.component.tcpip;
025:
026: import java.io.ByteArrayInputStream;
027: import java.io.ByteArrayOutputStream;
028: import java.io.IOException;
029: import java.io.InputStream;
030: import java.io.OutputStream;
031: import java.net.Socket;
032: import java.net.SocketException;
033: import java.net.SocketTimeoutException;
034: import java.net.URI;
035: import java.util.Vector;
036: import java.util.concurrent.TimeUnit;
037:
038: import javax.jbi.component.ComponentContext;
039: import javax.jbi.messaging.DeliveryChannel;
040: import javax.jbi.messaging.ExchangeStatus;
041: import javax.jbi.messaging.MessageExchange;
042: import javax.jbi.messaging.NormalizedMessage;
043: import javax.jbi.servicedesc.ServiceEndpoint;
044: import javax.xml.transform.Source;
045:
046: import org.apache.commons.logging.Log;
047: import org.apache.commons.logging.LogFactory;
048:
049: import com.bostechcorp.cbesb.common.constant.MetadataConstants;
050: import com.bostechcorp.cbesb.common.runtime.ResourcesConnectionException;
051: import com.bostechcorp.cbesb.common.util.ErrorUtil;
052: import com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.CbServiceUnit;
053: import com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.EndpointKeyUtil;
054: import com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.ServiceDescriptionHandler;
055: import com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.ServiceInfo;
056: import com.bostechcorp.cbesb.runtime.ccsl.jbi.messaging.ServiceUnitDescriptor;
057: import com.bostechcorp.cbesb.runtime.ccsl.lib.ExternalInput;
058: import com.bostechcorp.cbesb.runtime.ccsl.lib.ITcpipContext;
059: import com.bostechcorp.cbesb.runtime.ccsl.lib.ITcpipHandler;
060: import com.bostechcorp.cbesb.runtime.ccsl.lib.OutputWriter;
061: import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.ByteArraySource;
062: import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.NormalizedMessageHandler;
063: import com.bostechcorp.cbesb.runtime.ccsl.nmhandler.StringSource;
064: import com.bostechcorp.cbesb.runtime.component.util.wsdl.WsdlMepConstants;
065:
066: public class TcpipSocket extends Thread implements ITcpipContext {
067: protected final transient Log logger = LogFactory
068: .getLog(getClass());
069: private Socket socket;
070: private TcpipEndpoint endpoint;
071: private ITcpipHandler handler;
072: private InputStream sockin;
073: private OutputStream sockout;
074: private boolean isRunning = true;
075: protected ServiceDescriptionHandler targetSvcDescHandler;
076: protected ServiceDescriptionHandler providedSvcDescHandler;
077:
078: public static TcpipSocket getInstance(TcpipEndpoint ep, Socket sock)
079: throws Exception {
080: TcpipSocket inst = new TcpipSocket();
081: inst.endpoint = ep;
082: inst.socket = sock;
083: inst.handler = ep.getProtocolHandlerInstance();
084: if (inst.logger.isDebugEnabled())
085: inst.logger.debug("creating socket " + inst.socket
086: + " handler=" + inst.handler.getDescription()
087: + " " + inst.handler);
088: // start the thread running
089: inst.start();
090: return inst;
091: }
092:
093: public void forceStop() {
094: try {
095: isRunning = false;
096: socket.close();
097: } catch (Exception e) {
098:
099: ErrorUtil.printWarn("Error closing socket.", e);
100: }
101: }
102:
103: protected TcpipSocket() {
104: }
105:
106: /*
107: * Thread main method
108: */
109: public void run() {
110: logger.debug("starting socket thread");
111: try {
112: socket.setSoTimeout(0);
113: sockin = socket.getInputStream();
114: sockout = socket.getOutputStream();
115: } catch (IOException e) {
116: ErrorUtil
117: .printError(
118: "Can't get socket streams. Ending the socket thread...",
119: e);
120: isRunning = false;
121: }
122: handler.init(this );
123:
124: while (isRunning) {
125: try {
126:
127: if (endpoint.getConfig().getRole() == MessageExchange.Role.CONSUMER) {
128: doConsumer();
129: } else {
130: doProvider();
131: }
132: } catch (IOException ioe) {
133:
134: ErrorUtil
135: .printError(
136: "IOException in socket processor. Ending the socket thread...",
137: ioe);
138: isRunning = false;
139: ;
140: } catch (Exception e) {
141:
142: ErrorUtil
143: .printError(
144: "General Exception in socket processor. Ending the socket thread...",
145: e);
146: isRunning = false;
147: ;
148: }
149: }
150: if (logger.isDebugEnabled())
151: logger.debug("ending socket thread for" + socket);
152: endpoint.removeSocket(this );
153:
154: try {
155: socket.close();
156: } catch (IOException ioe) {
157: ErrorUtil.printWarn("Exception in closing socket.", ioe);
158: }
159: }
160:
161: /*
162: * Consumer processing - read data and pass to handler
163: */
164: private void doConsumer() throws Exception {
165: try {
166: long startTime = System.currentTimeMillis();
167: // get socket data
168: byte[] bytes = new byte[2048];
169: int l = sockin.read(bytes);
170: if (l < 1) {
171: throw new SocketException("socket disconnected");
172: }
173: byte[] result = null;
174: if (l < 2048) {
175: result = new byte[l];
176: for (int i = 0; i < l; i++)
177: result[i] = bytes[i];
178: } else
179: result = bytes;
180:
181: // send it to the handler
182: for (int bytesLeft = 0; (bytesLeft = result.length) > 0;) {
183: int consumed = handler.gotReceiveData(result);
184: if (consumed > 0) {
185: byte[] newBytes = new byte[bytesLeft - consumed];
186: for (int i = 0; i < newBytes.length; i++)
187: newBytes[i] = result[consumed++];
188: result = newBytes;
189: }
190: }
191: this .endpoint.sendMessageProcessedNotification(System
192: .currentTimeMillis()
193: - startTime);
194: logger.debug("tcpip consumer send one notification");
195: } catch (SocketTimeoutException toe) {
196: // pass timeout exceptions to the handler
197: handler.gotReceiveTimeout();
198: } catch (SocketException se) {
199: //
200: logger.info("Socket closed on host '"
201: + socket.getInetAddress().getCanonicalHostName()
202: + ":" + socket.getPort() + "'.");
203: throw se;
204: } catch (Exception e) {
205:
206: ErrorUtil.printError("Exception in doConsumer(): ", e);
207: throw e;
208: }
209: }
210:
211: /*
212: * Provider processing - wait for an exchange and call handler's provider method
213: */
214: private void doProvider() {
215: MessageExchange me = null;
216: // long startTime=System.currentTimeMillis();
217: try {
218: me = endpoint.getSendQueue().poll(10, TimeUnit.SECONDS);
219: if (me == null)
220: return;
221: } catch (InterruptedException e) {
222:
223: // ErrorUtil.printWarn("Exception in doProvider(): ",e);
224: // don't worry about interrupted, just continue
225: return;
226: }
227:
228: try {
229: URI mep = me.getPattern();
230:
231: if (isExchangeBasedProvider) {
232: // call the MessageExchange based handler
233: if (mep.compareTo(WsdlMepConstants.IN_ONLY) == 0) {
234: handler.processInOnlyExchange(me);
235: } else if (mep.compareTo(WsdlMepConstants.IN_OUT) == 0) {
236: handler.processInOutExchange(me);
237: } else
238: //throw new Exception("trying to process unknown MEP \""+mep+"\"");
239: handler.processInOnlyExchange(me);
240: } else {
241: // call the byte array based handler
242: ByteArrayOutputStream baos = new ByteArrayOutputStream();
243: NormalizedMessageHandler nmh = new NormalizedMessageHandler(
244: me.getMessage("in"));
245: Source src = nmh.getRecordAtIndex(0);
246: OutputWriter.processOutputStream(src, baos, "raw",
247: endpoint.getConfig().getCharset());
248: if (mep.compareTo(WsdlMepConstants.IN_ONLY) == 0) {
249: handler.processInOnlyBytes(baos.toByteArray());
250: } else if (mep.compareTo(WsdlMepConstants.IN_OUT) == 0) {
251: byte[] outBytes = handler.processInOutBytes(baos
252: .toByteArray());
253: if (outBytes != null) {
254: // determine the record type to generate
255: String recType;
256: if (endpoint.getConfig().getRecordType() != null)
257: recType = endpoint.getConfig()
258: .getRecordType();
259: else {
260: recType = "xml";
261: if (src instanceof StringSource)
262: recType = "string";
263: else if (src instanceof ByteArraySource)
264: recType = "binary";
265: }
266: // populate the out message
267: ExternalInput ext = new ExternalInput(
268: new ByteArrayInputStream(outBytes),
269: endpoint.getConfig().getCharset(),
270: "raw", recType, 0);
271: NormalizedMessage msg = me.createMessage();
272: ext.populateMessage(msg,
273: getProviderSvcDescHandlerInstance());
274: me.setMessage(msg, "out");
275: }
276: } else
277: //throw new Exception("trying to process unknown MEP \""+mep+"\"");
278: handler.processInOnlyBytes(baos.toByteArray());
279: }
280: // Notify the waiting provider processor that the exchange processing ended normally
281: endpoint.getReturnQueue().put(me);
282: //Here should not send notification, it already sent by LifeCycleEndpoint
283: //this.endpoint.sendMessageProcessedNotification(System.currentTimeMillis()-startTime);
284: //logger.info("tcpip provider send one notification"+this.endpoint.getEndpoint());
285: } catch (SocketException e) {
286:
287: // ErrorUtil.printError("Exception in doProvider(): ",e);
288: // Notify the waiting provider processor that the exchange processing got an exception
289: try {
290: endpoint
291: .getReturnQueue()
292: .put(
293: new ResourcesConnectionException(
294: "Socket exception on host '"
295: + socket
296: .getInetAddress()
297: .getCanonicalHostName()
298: + ":"
299: + socket.getPort()
300: + "'.",
301: "Check the connection.", e));
302: } catch (InterruptedException e1) {
303: // No need to worry about it. Ignore it for now since it will not happen in reality
304: }
305: this .forceStop();
306: } catch (Exception e) {
307:
308: // ErrorUtil.printError("Exception in doProvider(): ",e);
309: // Notify the waiting provider processor that the exchange processing got an exception
310: try {
311: endpoint.getReturnQueue().put(e);
312: } catch (InterruptedException e1) {
313: // No need to worry about it. Ignore it for now since it will not happen in reality
314: }
315: this .forceStop();
316: }
317: }
318:
319: protected ServiceDescriptionHandler getSvcDescHandlerInstance() {
320: if (targetSvcDescHandler == null) {
321: logger
322: .debug("Attempting to retreive Service Unit Descriptor");
323: ServiceUnitDescriptor suDescriptor = endpoint
324: .getServiceUnit().getServiceUnitDescriptor();
325: if (suDescriptor != null) {
326: Vector<ServiceInfo> consumes = suDescriptor
327: .getConsumes();
328: if (consumes.size() > 0) {
329: ServiceInfo svcInfo = consumes.elementAt(0);
330: logger.debug("Target Service Info: "
331: + svcInfo.toString());
332: ComponentContext context = endpoint
333: .getServiceUnit().getComponent()
334: .getComponentContext();
335: targetSvcDescHandler = ServiceDescriptionHandler
336: .getInstance(svcInfo, context);
337: }
338: }
339: }
340: return targetSvcDescHandler;
341: }
342:
343: protected ServiceDescriptionHandler getProviderSvcDescHandlerInstance() {
344: if (providedSvcDescHandler == null) {
345: if (endpoint.getDescription() != null) {
346: providedSvcDescHandler = ServiceDescriptionHandler
347: .getInstance(endpoint.getDescription());
348: }
349: }
350: return providedSvcDescHandler;
351: }
352:
353: /*
354: * ITcpipContext Implementation
355: */
356: private boolean isExchangeBasedProvider = false;
357: private boolean isAsyncSend = false;
358:
359: public void sendSocket(byte[] bytes) throws Exception {
360: sockin.available(); // this gets an exception if the peer disconnected.
361: sockout.write(bytes);
362: sockout.flush();
363: }
364:
365: public byte[] receiveSocket() throws Exception {
366: byte[] bytes = new byte[2048];
367: int l;
368: l = sockin.read(bytes);
369: if (l < 1) {
370: throw new SocketException("socket disconnected");
371: }
372: byte[] result = null;
373: if (l < 2048) {
374: result = new byte[l];
375: for (int i = 0; i < l; i++)
376: result[i] = bytes[i];
377: } else
378: result = bytes;
379: return result;
380: }
381:
382: public void setSocketTimeout(int timeOutMillis) {
383: try {
384: socket.setSoTimeout(timeOutMillis);
385: } catch (Exception e) {
386:
387: ErrorUtil
388: .printError("Exception in setSocketTimeout(): ", e);
389: }
390: }
391:
392: public byte[] createInbound(byte[] bytes) throws Exception {
393: byte[] returnBytes = null;
394: MessageExchange me = null;
395: DeliveryChannel channel = endpoint.getDeliveryChannel();
396: CbServiceUnit su = endpoint.getServiceUnit();
397: ComponentContext context = su.getComponent()
398: .getComponentContext();
399:
400: // create a message exchange
401: logger.debug("createInbound, DefaultMEP :"
402: + endpoint.getConfig().getDefaultMep());
403: URI defaultMep = endpoint.getConfig().getDefaultMep();
404: if (defaultMep.compareTo(WsdlMepConstants.IN_ONLY) == 0) {
405: me = channel.createExchangeFactory().createInOnlyExchange();
406: } else if (defaultMep.compareTo(WsdlMepConstants.IN_OUT) == 0) {
407: me = channel.createExchangeFactory().createInOutExchange();
408: } else if (defaultMep
409: .compareTo(WsdlMepConstants.ROBUST_IN_ONLY) == 0) {
410: me = channel.createExchangeFactory()
411: .createRobustInOnlyExchange();
412: } else
413: throw new Exception("trying to process unknown MEP \""
414: + defaultMep + "\"");
415:
416: // populate the exchange and send it into the container
417: String endpointKey = EndpointKeyUtil.getKey(endpoint);
418: me.setProperty(MetadataConstants.SENDER_ENDPOINT_PROPERTY,
419: endpointKey);
420: ExternalInput ext = new ExternalInput(new ByteArrayInputStream(
421: bytes), endpoint.getConfig().getCharset(), "raw",
422: endpoint.getConfig().getRecordType(), 0);
423: NormalizedMessage msg = me.createMessage();
424: ext.populateMessage(msg, getSvcDescHandlerInstance());
425: me.setMessage(msg, "in");
426: logger.debug("Consumer endpoint service="
427: + endpoint.getService() + " endpoint="
428: + endpoint.getEndpoint());
429: ServiceEndpoint linkedEndpoint = context.getEndpoint(endpoint
430: .getService(), endpoint.getEndpoint());
431: logger.debug("Got target endpoint " + linkedEndpoint
432: + " service=" + linkedEndpoint.getServiceName()
433: + " endpoint=" + linkedEndpoint.getEndpointName());
434: me.setEndpoint(linkedEndpoint);
435: me.setService(endpoint.getService());
436: if (isAsyncSend) {
437: // do an asynchronous send, no return bytes
438: channel.send(me);
439: } else {
440: // do synchronous send and check for return bytes
441: channel.sendSync(me);
442:
443: // populate the return message
444: if (ExchangeStatus.ACTIVE.equals(me.getStatus())) {
445: msg = me.getMessage("out");
446: if (msg != null) {
447: NormalizedMessageHandler nmh = new NormalizedMessageHandler(
448: msg);
449: if (nmh.getRecordCount() > 0) {
450: Source src = nmh.getRecordAtIndex(0);
451: ByteArrayOutputStream os = new ByteArrayOutputStream();
452: OutputWriter.processOutputStream(src, os,
453: "raw", endpoint.getConfig()
454: .getCharset());
455: returnBytes = os.toByteArray();
456: }
457: }
458: }
459: }
460: return returnBytes;
461: }
462:
463: public void setExchangeBasedProvider(boolean isEnabled) {
464: logger.debug("setting MessageExchange based provider");
465: isExchangeBasedProvider = isEnabled;
466: }
467:
468: public void setIsAsyncSend(boolean isAsync) {
469: isAsyncSend = isAsync;
470: }
471: }
|