001: /*
002: * Licensed to the Apache Software Foundation (ASF) under one
003: * or more contributor license agreements. See the NOTICE file
004: * distributed with this work for additional information
005: * regarding copyright ownership. The ASF licenses this file
006: * to you under the Apache License, Version 2.0 (the
007: * "License"); you may not use this file except in compliance
008: * with the License. You may obtain a copy of the License at
009: *
010: * http://www.apache.org/licenses/LICENSE-2.0
011: *
012: * Unless required by applicable law or agreed to in writing,
013: * software distributed under the License is distributed on an
014: * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015: * KIND, either express or implied. See the License for the
016: * specific language governing permissions and limitations
017: * under the License.
018: */
019: package org.apache.synapse.transport.base;
020:
021: import org.apache.axis2.context.ConfigurationContext;
022: import org.apache.axis2.context.SessionContext;
023: import org.apache.axis2.context.MessageContext;
024: import org.apache.axis2.description.*;
025: import org.apache.axis2.AxisFault;
026: import org.apache.axis2.util.MessageContextBuilder;
027: import org.apache.synapse.transport.base.threads.WorkerPool;
028: import org.apache.synapse.transport.base.threads.WorkerPoolFactory;
029: import org.apache.axis2.transport.TransportListener;
030: import org.apache.axis2.engine.AxisEngine;
031: import org.apache.axis2.engine.AxisObserver;
032: import org.apache.axis2.engine.AxisConfiguration;
033: import org.apache.axis2.engine.AxisEvent;
034: import org.apache.axis2.addressing.EndpointReference;
035: import org.apache.commons.logging.Log;
036: import org.apache.commons.logging.LogFactory;
037: import org.apache.axiom.om.util.UUIDGenerator;
038: import org.apache.axiom.om.OMElement;
039:
040: import java.util.*;
041:
042: public abstract class AbstractTransportListener implements
043: TransportListener {
044:
045: /** the reference to the actual commons logger to be used for log messages */
046: protected Log log = null;
047:
048: /** the name of the transport */
049: protected String transportName = null;
050: /** the axis2 configuration context */
051: protected ConfigurationContext cfgCtx = null;
052: /** an axis2 engine over the above configuration context to process messages */
053: protected AxisEngine engine = null;
054:
055: /** transport in description */
056: private TransportInDescription transportIn = null;
057: /** transport out description */
058: private TransportOutDescription transportOut = null;
059: /** is this transport started? */
060: protected boolean started = false;
061: /** is this transport non-blocking? */
062: protected boolean isNonBlocking = false;
063: /** the axis observer that gets notified of service life cycle events*/
064: private final AxisObserver axisObserver = new GenericAxisObserver();
065:
066: /** the thread pool to execute actual poll invocations */
067: protected WorkerPool workerPool = null;
068: /** use the thread pool available in the axis2 configuration context */
069: protected boolean useAxis2ThreadPool = false;
070:
071: /**
072: * A constructor that makes subclasses pick up the correct logger
073: */
074: protected AbstractTransportListener() {
075: log = LogFactory.getLog(this .getClass());
076: }
077:
078: /**
079: * Initialize the generic transport. Sets up the transport and the thread pool to be used
080: * for message processing. Also creates an AxisObserver that gets notified of service
081: * life cycle events for the transport to act on
082: * @param cfgCtx the axis configuration context
083: * @param transportIn the transport-in description
084: * @throws AxisFault on error
085: */
086: public void init(ConfigurationContext cfgCtx,
087: TransportInDescription transportIn) throws AxisFault {
088:
089: this .cfgCtx = cfgCtx;
090: this .engine = new AxisEngine(cfgCtx);
091: this .transportIn = transportIn;
092: this .transportOut = cfgCtx.getAxisConfiguration()
093: .getTransportOut(transportName);
094:
095: if (useAxis2ThreadPool) {
096: //this.workerPool = cfgCtx.getThreadPool(); not yet implemented
097: throw new AxisFault(
098: "Unsupported thread pool for task execution - Axis2 thread pool");
099: } else {
100: this .workerPool = WorkerPoolFactory.getWorkerPool(10, 20,
101: 5, -1,
102: transportName + "Server Worker thread group",
103: transportName + "-Worker");
104: }
105:
106: // register to receive updates on services for lifetime management
107: cfgCtx.getAxisConfiguration().addObservers(axisObserver);
108: }
109:
110: public void destroy() {
111: try {
112: if (started) {
113: try {
114: stop();
115: } catch (AxisFault ignore) {
116: log.warn("Error stopping the transport : "
117: + transportName);
118: }
119: }
120: } finally {
121: started = false;
122: }
123: }
124:
125: public void stop() throws AxisFault {
126: if (started) {
127: started = false;
128: // cancel receipt of service lifecycle events
129: cfgCtx.getAxisConfiguration().getObserversList().remove(
130: axisObserver);
131: }
132: }
133:
134: public void start() throws AxisFault {
135: if (!started) {
136: started = true;
137: // register to receive updates on services for lifetime management
138: cfgCtx.getAxisConfiguration().addObservers(axisObserver);
139: }
140:
141: // iterate through deployed services and start
142: Iterator services = cfgCtx.getAxisConfiguration().getServices()
143: .values().iterator();
144:
145: while (services.hasNext()) {
146: AxisService service = (AxisService) services.next();
147: if (BaseUtils.isUsingTransport(service, transportName)) {
148: startListeningForService(service);
149: }
150: }
151: }
152:
153: protected abstract void startListeningForService(AxisService service);
154:
155: protected abstract void stopListeningForService(AxisService service);
156:
157: /**
158: * This is a deprecated method in Axis2 and this default implementation returns the first
159: * result from the getEPRsForService() method
160: */
161: public EndpointReference getEPRForService(String serviceName,
162: String ip) throws AxisFault {
163: return getEPRsForService(serviceName, ip)[0];
164: }
165:
166: public SessionContext getSessionContext(
167: MessageContext messageContext) {
168: return null;
169: }
170:
171: /**
172: * Create a new axis MessageContext for an incoming message through this transport
173: * @return the newly created message context
174: */
175: public MessageContext createMessageContext() {
176: MessageContext msgCtx = new MessageContext();
177: msgCtx.setConfigurationContext(cfgCtx);
178:
179: msgCtx.setIncomingTransportName(transportName);
180: msgCtx.setTransportOut(transportOut);
181: msgCtx.setTransportIn(transportIn);
182: msgCtx.setServerSide(true);
183: msgCtx.setMessageID(UUIDGenerator.getUUID());
184:
185: // There is a discrepency in what I thought, Axis2 spawns a nes threads to
186: // send a message is this is TRUE - and I want it to be the other way
187: msgCtx.setProperty(MessageContext.TRANSPORT_NON_BLOCKING,
188: Boolean.valueOf(!isNonBlocking));
189:
190: // are these relevant?
191: //msgCtx.setServiceGroupContextId(UUIDGenerator.getUUID());
192: // this is required to support Sandesha 2
193: //msgContext.setProperty(RequestResponseTransport.TRANSPORT_CONTROL,
194: // new HttpCoreRequestResponseTransport(msgContext));
195:
196: return msgCtx;
197: }
198:
199: /**
200: * Process a new incoming message through the axis engine
201: * @param msgCtx the axis MessageContext
202: * @param trpHeaders the map containing transport level message headers
203: * @param soapAction the optional soap action or null
204: * @param contentType the optional content-type for the message
205: */
206: public void handleIncomingMessage(MessageContext msgCtx,
207: Map trpHeaders, String soapAction, String contentType)
208: throws AxisFault {
209:
210: // set the soapaction if one is available via a transport header
211: if (soapAction != null) {
212: msgCtx.setSoapAction(soapAction);
213: }
214:
215: // set the transport headers to the message context
216: msgCtx
217: .setProperty(MessageContext.TRANSPORT_HEADERS,
218: trpHeaders);
219:
220: // send the message context through the axis engine
221: try {
222: try {
223: engine.receive(msgCtx);
224: } catch (AxisFault e) {
225: e.printStackTrace();
226: if (log.isDebugEnabled()) {
227: log.debug("Error receiving message", e);
228: }
229: if (msgCtx.isServerSide()) {
230: engine.sendFault(MessageContextBuilder
231: .createFaultMessageContext(msgCtx, e));
232: }
233: }
234: } catch (AxisFault axisFault) {
235: logException("Error processing received message", axisFault);
236: throw axisFault;
237: }
238: }
239:
240: protected void handleException(String msg, Exception e)
241: throws AxisFault {
242: log.error(msg, e);
243: throw new AxisFault(msg, e);
244: }
245:
246: protected void logException(String msg, Exception e) {
247: log.error(msg, e);
248: }
249:
250: public String getTransportName() {
251: return transportName;
252: }
253:
254: public void setTransportName(String transportName) {
255: this .transportName = transportName;
256: }
257:
258: /**
259: * An AxisObserver which will start listening for newly deployed or started services,
260: * and stop listening when services are undeployed or stopped.
261: */
262: class GenericAxisObserver implements AxisObserver {
263:
264: // The initilization code will go here
265: public void init(AxisConfiguration axisConfig) {
266: }
267:
268: public void serviceUpdate(AxisEvent event, AxisService service) {
269:
270: if (service.getName().startsWith("__")) {
271: return; // these are "private" services
272: }
273:
274: if (BaseUtils.isUsingTransport(service, transportName)) {
275: switch (event.getEventType()) {
276: case AxisEvent.SERVICE_DEPLOY:
277: startListeningForService(service);
278: break;
279: case AxisEvent.SERVICE_REMOVE:
280: stopListeningForService(service);
281: break;
282: case AxisEvent.SERVICE_START:
283: startListeningForService(service);
284: break;
285: case AxisEvent.SERVICE_STOP:
286: stopListeningForService(service);
287: break;
288: }
289: }
290: }
291:
292: public void moduleUpdate(AxisEvent event, AxisModule module) {
293: }
294:
295: public void addParameter(Parameter param) throws AxisFault {
296: }
297:
298: public void removeParameter(Parameter param) throws AxisFault {
299: }
300:
301: public void deserializeParameters(OMElement parameterElement)
302: throws AxisFault {
303: }
304:
305: public Parameter getParameter(String name) {
306: return null;
307: }
308:
309: public ArrayList getParameters() {
310: return null;
311: }
312:
313: public boolean isParameterLocked(String parameterName) {
314: return false;
315: }
316:
317: public void serviceGroupUpdate(AxisEvent event,
318: AxisServiceGroup serviceGroup) {
319: }
320: }
321:
322: }
|