001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.transport.local;
019:
020: import java.io.IOException;
021: import java.io.InputStream;
022: import java.io.OutputStream;
023: import java.io.PipedInputStream;
024: import java.io.PipedOutputStream;
025: import java.util.logging.Logger;
026:
027: import org.apache.cxf.common.logging.LogUtils;
028: import org.apache.cxf.message.Exchange;
029: import org.apache.cxf.message.Message;
030: import org.apache.cxf.message.MessageImpl;
031: import org.apache.cxf.service.model.EndpointInfo;
032: import org.apache.cxf.transport.AbstractConduit;
033: import org.apache.cxf.transport.AbstractDestination;
034: import org.apache.cxf.transport.Conduit;
035: import org.apache.cxf.ws.addressing.EndpointReferenceType;
036:
037: public class LocalDestination extends AbstractDestination {
038:
039: private static final Logger LOG = LogUtils
040: .getL7dLogger(LocalDestination.class);
041:
042: private LocalTransportFactory localDestinationFactory;
043:
044: public LocalDestination(
045: LocalTransportFactory localDestinationFactory,
046: EndpointReferenceType epr, EndpointInfo ei) {
047: super (epr, ei);
048: this .localDestinationFactory = localDestinationFactory;
049: }
050:
051: public void shutdown() {
052: localDestinationFactory.remove(this );
053: }
054:
055: protected Logger getLogger() {
056: return LOG;
057: }
058:
059: @Override
060: protected Conduit getInbuiltBackChannel(Message inMessage) {
061: Conduit conduit = (Conduit) inMessage
062: .get(LocalConduit.IN_CONDUIT);
063: if (conduit instanceof LocalConduit) {
064: return new SynchronousConduit((LocalConduit) conduit);
065: }
066: return null;
067: }
068:
069: static class SynchronousConduit extends AbstractConduit {
070: private LocalConduit conduit;
071:
072: public SynchronousConduit(LocalConduit conduit) {
073: super (null);
074: this .conduit = conduit;
075: }
076:
077: public void prepare(final Message message) throws IOException {
078: if (!Boolean.TRUE.equals(message
079: .get(LocalConduit.DIRECT_DISPATCH))) {
080: final Exchange exchange = (Exchange) message
081: .getExchange().get(LocalConduit.IN_EXCHANGE);
082:
083: final PipedInputStream stream = new PipedInputStream();
084: final Runnable receiver = new Runnable() {
085: public void run() {
086: MessageImpl m = new MessageImpl();
087: if (exchange != null) {
088: exchange.setInMessage(m);
089: }
090: m.setContent(InputStream.class, stream);
091: conduit.getMessageObserver().onMessage(m);
092: }
093: };
094:
095: PipedOutputStream outStream = new PipedOutputStream(
096: stream);
097: message.setContent(OutputStream.class, outStream);
098:
099: new Thread(receiver).start();
100: }
101: }
102:
103: @Override
104: public void close(Message message) throws IOException {
105: if (Boolean.TRUE.equals(message
106: .get(LocalConduit.DIRECT_DISPATCH))) {
107: final Exchange exchange = (Exchange) message
108: .getExchange().get(LocalConduit.IN_EXCHANGE);
109: MessageImpl copy = new MessageImpl();
110: copy.putAll(message);
111: MessageImpl.copyContent(message, copy);
112:
113: if (exchange.getInMessage() == null) {
114: exchange.setInMessage(copy);
115: }
116:
117: conduit.getMessageObserver().onMessage(copy);
118: return;
119: }
120:
121: super .close(message);
122: }
123:
124: protected Logger getLogger() {
125: return LOG;
126: }
127: }
128:
129: }
|