001: /**
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */package org.apache.cxf.transport.jbi;
019:
020: import java.io.IOException;
021: import java.io.InputStream;
022: import java.util.logging.Level;
023: import java.util.logging.Logger;
024:
025: import javax.jbi.messaging.DeliveryChannel;
026: import javax.jbi.messaging.ExchangeStatus;
027: import javax.jbi.messaging.MessageExchange;
028: import javax.jbi.messaging.NormalizedMessage;
029: import javax.xml.namespace.QName;
030:
031: import org.apache.cxf.common.logging.LogUtils;
032: import org.apache.cxf.message.MessageImpl;
033: import org.apache.cxf.transport.ConduitInitiator;
034:
035: public final class JBIDispatcherUtil {
036: private static final Logger LOG = LogUtils
037: .getL7dLogger(JBIDispatcherUtil.class);
038: private static JBIDispatcherUtil dispatchUtil;
039: private final DeliveryChannel channel;
040: private ConduitInitiator conduitInitiator;
041: private int activeEndpoints;
042: private boolean running;
043:
044: private JBIDispatcherUtil(ConduitInitiator ci, DeliveryChannel dc) {
045: this .conduitInitiator = ci;
046: this .channel = dc;
047: }
048:
049: public static synchronized JBIDispatcherUtil getInstance(
050: ConduitInitiator ci, DeliveryChannel dc) {
051: if (dispatchUtil == null) {
052: dispatchUtil = new JBIDispatcherUtil(ci, dc);
053: }
054: return dispatchUtil;
055:
056: }
057:
058: public static void clean() {
059: dispatchUtil = null;
060: }
061:
062: public void activateDispatch() {
063: activeEndpoints++;
064: if (!running && channel != null) {
065: new Thread(new JBIDispatcher()).start();
066: }
067: }
068:
069: public void startDispatch() {
070:
071: }
072:
073: public void deactivateDispatch() {
074: activeEndpoints--;
075: }
076:
077: protected Logger getLogger() {
078: return LOG;
079: }
080:
081: private class JBIDispatcher implements Runnable {
082:
083: public final void run() {
084:
085: try {
086: synchronized (channel) {
087: running = true;
088: }
089: getLogger().info(
090: new org.apache.cxf.common.i18n.Message(
091: "RECEIVE.THREAD.START", getLogger())
092: .toString());
093: do {
094: MessageExchange exchange = null;
095: synchronized (channel) {
096: try {
097: exchange = channel.accept();
098: } catch (Exception e) {
099: // ignore
100: }
101: }
102:
103: if (exchange != null
104: && exchange.getStatus() == ExchangeStatus.ACTIVE) {
105:
106: try {
107: getLogger()
108: .info(
109: new org.apache.cxf.common.i18n.Message(
110: "DISPATCH.TO.SU",
111: getLogger())
112: .toString());
113: dispatch(exchange);
114:
115: } finally {
116: //
117: }
118: }
119: } while (activeEndpoints > 0);
120: synchronized (channel) {
121: running = false;
122: }
123: } catch (Exception ex) {
124: getLogger().log(
125: Level.SEVERE,
126: new org.apache.cxf.common.i18n.Message(
127: "ERROR.DISPATCH.THREAD", getLogger())
128: .toString(), ex);
129: }
130: getLogger()
131: .fine(
132: new org.apache.cxf.common.i18n.Message(
133: "JBI.SERVER.TRANSPORT.MESSAGE.PROCESS.THREAD.EXIT",
134: getLogger()).toString());
135: }
136: }
137:
138: public void dispatch(MessageExchange exchange) throws IOException {
139:
140: QName opName = exchange.getOperation();
141: getLogger().info("dispatch method: " + opName);
142:
143: NormalizedMessage nm = exchange.getMessage("in");
144: try {
145:
146: MessageImpl inMessage = new MessageImpl();
147: inMessage.put(MessageExchange.class, exchange);
148:
149: final InputStream in = JBIMessageHelper
150: .convertMessageToInputStream(nm.getContent());
151: inMessage.setContent(InputStream.class, in);
152:
153: //dispatch to correct destination in case of multiple endpoint
154: inMessage
155: .setDestination(((JBITransportFactory) conduitInitiator)
156: .getDestination(exchange.getService()
157: .toString()
158: + exchange.getInterfaceName()
159: .toString()));
160: ((JBITransportFactory) conduitInitiator).getDestination(
161: exchange.getService().toString()
162: + exchange.getInterfaceName().toString())
163: .getMessageObserver().onMessage(inMessage);
164:
165: } catch (Exception ex) {
166: getLogger().log(
167: Level.SEVERE,
168: new org.apache.cxf.common.i18n.Message(
169: "ERROR.PREPARE.MESSAGE", getLogger())
170: .toString(), ex);
171: throw new IOException(ex.getMessage());
172: }
173:
174: }
175: }
|