001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.transport.jbi;
019:
020: import java.io.ByteArrayInputStream;
021: import java.io.ByteArrayOutputStream;
022: import java.io.IOException;
023: import java.io.InputStream;
024: import java.lang.reflect.Member;
025: import java.lang.reflect.Method;
026: import java.util.logging.Logger;
027:
028: import javax.jbi.messaging.DeliveryChannel;
029: import javax.jbi.messaging.InOut;
030: import javax.jbi.messaging.MessageExchange;
031: import javax.jbi.messaging.MessageExchangeFactory;
032: import javax.jbi.messaging.NormalizedMessage;
033: import javax.jws.WebService;
034: import javax.xml.namespace.QName;
035: import javax.xml.transform.Source;
036: import javax.xml.transform.stream.StreamSource;
037:
038: import org.apache.cxf.common.logging.LogUtils;
039: import org.apache.cxf.io.CachedOutputStream;
040: import org.apache.cxf.message.Exchange;
041: import org.apache.cxf.message.Message;
042: import org.apache.cxf.message.MessageImpl;
043: import org.apache.cxf.service.model.BindingOperationInfo;
044: import org.apache.cxf.ws.addressing.EndpointReferenceType;
045: import org.apache.cxf.wsdl.EndpointReferenceUtils;
046:
047: public class JBIConduitOutputStream extends CachedOutputStream {
048:
049: private static final Logger LOG = LogUtils
050: .getL7dLogger(JBIConduitOutputStream.class);
051:
052: private Message message;
053: private boolean isOneWay;
054: private DeliveryChannel channel;
055: private JBIConduit conduit;
056: private EndpointReferenceType target;
057:
058: public JBIConduitOutputStream(Message m, DeliveryChannel channel,
059: EndpointReferenceType target, JBIConduit conduit) {
060: message = m;
061: this .channel = channel;
062: this .conduit = conduit;
063: this .target = target;
064:
065: }
066:
067: @Override
068: protected void doFlush() throws IOException {
069:
070: }
071:
072: @Override
073: protected void doClose() throws IOException {
074: isOneWay = message.getExchange().isOneWay();
075: commitOutputMessage();
076: if (target != null) {
077: target.getClass();
078: }
079: }
080:
081: private void commitOutputMessage() throws IOException {
082: try {
083: Member member = (Member) message
084: .get(Method.class.getName());
085: Class<?> clz = member.getDeclaringClass();
086: Exchange exchange = message.getExchange();
087: BindingOperationInfo bop = exchange
088: .get(BindingOperationInfo.class);
089:
090: LOG.info(new org.apache.cxf.common.i18n.Message(
091: "INVOKE.SERVICE", LOG).toString()
092: + clz);
093:
094: WebService ws = clz.getAnnotation(WebService.class);
095: assert ws != null;
096: QName interfaceName = new QName(ws.targetNamespace(), ws
097: .name());
098: QName serviceName = null;
099: if (target != null) {
100: serviceName = EndpointReferenceUtils
101: .getServiceName(target);
102: } else {
103: serviceName = message.getExchange().get(
104: org.apache.cxf.service.Service.class).getName();
105: }
106:
107: MessageExchangeFactory factory = channel
108: .createExchangeFactoryForService(serviceName);
109: LOG.info(new org.apache.cxf.common.i18n.Message(
110: "CREATE.MESSAGE.EXCHANGE", LOG).toString()
111: + serviceName);
112: MessageExchange xchng = null;
113: if (isOneWay) {
114: xchng = factory.createInOnlyExchange();
115: } else if (bop.getOutput() == null) {
116: xchng = factory.createRobustInOnlyExchange();
117: } else {
118: xchng = factory.createInOutExchange();
119: }
120:
121: NormalizedMessage inMsg = xchng.createMessage();
122: LOG.info(new org.apache.cxf.common.i18n.Message(
123: "EXCHANGE.ENDPOINT", LOG).toString()
124: + xchng.getEndpoint());
125: if (inMsg != null) {
126: LOG.info("setup message contents on " + inMsg);
127: inMsg.setContent(getMessageContent(message));
128: xchng.setService(serviceName);
129: LOG.info("service for exchange " + serviceName);
130:
131: xchng.setInterfaceName(interfaceName);
132:
133: xchng.setOperation(bop.getName());
134: xchng.setMessage(inMsg, "in");
135: LOG.info("sending message");
136: if (!isOneWay) {
137: channel.sendSync(xchng);
138: NormalizedMessage outMsg = ((InOut) xchng)
139: .getOutMessage();
140: Source content = null;
141: if (outMsg != null) {
142: content = outMsg.getContent();
143: } else {
144: content = ((InOut) xchng).getFault()
145: .getContent();
146: }
147: Message inMessage = new MessageImpl();
148: message.getExchange().setInMessage(inMessage);
149: InputStream ins = JBIMessageHelper
150: .convertMessageToInputStream(content);
151: if (ins == null) {
152: throw new IOException(
153: new org.apache.cxf.common.i18n.Message(
154: "UNABLE.RETRIEVE.MESSAGE", LOG)
155: .toString());
156: }
157: inMessage.setContent(InputStream.class, ins);
158:
159: conduit.getMessageObserver().onMessage(inMessage);
160:
161: } else {
162: channel.send(xchng);
163: }
164:
165: } else {
166: LOG.info(new org.apache.cxf.common.i18n.Message(
167: "NO.MESSAGE", LOG).toString());
168: }
169:
170: } catch (Exception e) {
171: e.printStackTrace();
172: throw new IOException(e.toString());
173: }
174: }
175:
176: private Source getMessageContent(Message message2) {
177: ByteArrayOutputStream bos = (ByteArrayOutputStream) getOut();
178: return new StreamSource(new ByteArrayInputStream(bos
179: .toByteArray()));
180:
181: }
182:
183: @Override
184: protected void onWrite() throws IOException {
185:
186: }
187:
188: }
|