001: /*
002: * $Id: JettyHttpMessageReceiver.java 10961 2008-02-22 19:01:02Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.http.jetty;
012:
013: import org.mule.MuleServer;
014: import org.mule.RegistryContext;
015: import org.mule.api.MuleContext;
016: import org.mule.api.MuleException;
017: import org.mule.api.config.ThreadingProfile;
018: import org.mule.api.endpoint.EndpointBuilder;
019: import org.mule.api.endpoint.InboundEndpoint;
020: import org.mule.api.lifecycle.CreateException;
021: import org.mule.api.lifecycle.LifecycleException;
022: import org.mule.api.service.Service;
023: import org.mule.api.transport.Connector;
024: import org.mule.config.i18n.CoreMessages;
025: import org.mule.endpoint.EndpointURIEndpointBuilder;
026: import org.mule.transport.AbstractMessageReceiver;
027: import org.mule.transport.http.i18n.HttpMessages;
028: import org.mule.transport.http.servlet.MuleRESTReceiverServlet;
029: import org.mule.transport.http.servlet.ServletConnector;
030: import org.mule.util.StringUtils;
031:
032: import org.mortbay.http.HttpContext;
033: import org.mortbay.http.SocketListener;
034: import org.mortbay.jetty.Server;
035: import org.mortbay.jetty.servlet.ServletHandler;
036: import org.mortbay.util.InetAddrPort;
037:
038: /**
039: * <code>HttpMessageReceiver</code> is a simple http server that can be used to
040: * listen for http requests on a particular port
041: */
042: public class JettyHttpMessageReceiver extends AbstractMessageReceiver {
043: public static final String JETTY_SERVLET_CONNECTOR_NAME = "_jettyConnector";
044:
045: private Server httpServer;
046:
047: public JettyHttpMessageReceiver(Connector connector,
048: Service service, InboundEndpoint endpoint)
049: throws CreateException {
050:
051: super (connector, service, endpoint);
052:
053: if ("rest".equals(endpoint.getEndpointURI().getScheme())) {
054: // We need tohave a Servlet Connector pointing to our servlet so the Servlets can
055: // find the listeners for incoming requests
056: ServletConnector scon = (ServletConnector) RegistryContext
057: .getRegistry().lookupConnector(
058: JETTY_SERVLET_CONNECTOR_NAME);
059: if (scon != null) {
060: throw new CreateException(
061: HttpMessages
062: .noServletConnectorFound(JETTY_SERVLET_CONNECTOR_NAME),
063: this );
064: }
065:
066: scon = new ServletConnector();
067: scon.setName(JETTY_SERVLET_CONNECTOR_NAME);
068: scon.setServletUrl(endpoint.getEndpointURI().getAddress());
069: try {
070: MuleContext muleContext = MuleServer.getMuleContext();
071: scon.setMuleContext(muleContext);
072: //muleContext.applyLifecycle(scon);
073: muleContext.getRegistry().registerConnector(scon);
074:
075: String path = endpoint.getEndpointURI().getPath();
076: if (StringUtils.isEmpty(path)) {
077: path = "/";
078: }
079:
080: EndpointBuilder endpointBuilder = new EndpointURIEndpointBuilder(
081: "servlet://" + path.substring(1), connector
082: .getMuleContext());
083: endpointBuilder.setTransformers(endpoint
084: .getTransformers());
085: InboundEndpoint ep = connector.getMuleContext()
086: .getRegistry().lookupEndpointFactory()
087: .getInboundEndpoint(endpointBuilder);
088: scon.registerListener(service, ep);
089: } catch (Exception e) {
090: throw new CreateException(e, this );
091: }
092: }
093:
094: }
095:
096: protected void doConnect() throws Exception {
097: httpServer = new Server();
098: SocketListener socketListener = new SocketListener(
099: new InetAddrPort(endpoint.getEndpointURI().getPort()));
100:
101: // apply Threading settings
102: ThreadingProfile tp = connector.getReceiverThreadingProfile();
103: socketListener.setMaxIdleTimeMs((int) tp.getThreadTTL());
104: int threadsActive = tp.getMaxThreadsActive();
105: int threadsMin = socketListener.getMinThreads();
106: if (threadsMin >= threadsActive) {
107: socketListener.setMinThreads(threadsActive - 1);
108: }
109: socketListener.setMaxThreads(threadsActive);
110: // thread priorities are evil and gone from ThreadingProfile
111: // (google for priority inversion)
112: // socketListener.setThreadsPriority(tp.getThreadPriority());
113:
114: httpServer.addListener(socketListener);
115:
116: String path = endpoint.getEndpointURI().getPath();
117: if (StringUtils.isEmpty(path)) {
118: path = "/";
119: }
120:
121: if (!path.endsWith("/")) {
122: path += "/";
123: }
124:
125: HttpContext context = httpServer.getContext("/");
126: context.setRequestLog(null);
127:
128: ServletHandler handler = new ServletHandler();
129: if ("rest".equals(endpoint.getEndpointURI().getScheme())) {
130: handler.addServlet("MuleRESTReceiverServlet", path + "*",
131: MuleRESTReceiverServlet.class.getName());
132: } else {
133: handler.addServlet("JettyReceiverServlet", path + "*",
134: JettyReceiverServlet.class.getName());
135: }
136:
137: context.addHandler(handler);
138: // setAttribute is already synchronized in Jetty
139: context.setAttribute("messageReceiver", this );
140:
141: }
142:
143: protected void doDisconnect() throws Exception {
144: // stop is automativcally called by Mule
145: }
146:
147: /**
148: * Template method to dispose any resources associated with this receiver. There
149: * is not need to dispose the connector as this is already done by the framework
150: */
151: protected void doDispose() {
152: try {
153: httpServer.stop(false);
154: } catch (InterruptedException e) {
155: logger.error("Error disposing Jetty recevier on: "
156: + endpoint.getEndpointURI().toString(), e);
157: }
158: }
159:
160: protected void doStart() throws MuleException {
161: try {
162: httpServer.start();
163: } catch (Exception e) {
164: throw new LifecycleException(CoreMessages
165: .failedToStart("Jetty Http Receiver"), e, this );
166: }
167: }
168:
169: protected void doStop() throws MuleException {
170: try {
171: httpServer.stop(true);
172: } catch (InterruptedException e) {
173: throw new LifecycleException(CoreMessages
174: .failedToStop("Jetty Http Receiver"), e, this);
175: }
176: }
177:
178: }
|