001: /*
002: * $Id: UdpMessageReceiver.java 10961 2008-02-22 19:01:02Z dfeist $
003: * --------------------------------------------------------------------------------------
004: * Copyright (c) MuleSource, Inc. All rights reserved. http://www.mulesource.com
005: *
006: * The software in this package is published under the terms of the CPAL v1.0
007: * license, a copy of which has been included with this distribution in the
008: * LICENSE.txt file.
009: */
010:
011: package org.mule.transport.udp;
012:
013: import org.mule.DefaultMuleMessage;
014: import org.mule.api.MuleException;
015: import org.mule.api.MuleMessage;
016: import org.mule.api.config.MuleProperties;
017: import org.mule.api.endpoint.InboundEndpoint;
018: import org.mule.api.lifecycle.CreateException;
019: import org.mule.api.lifecycle.Disposable;
020: import org.mule.api.service.Service;
021: import org.mule.api.transport.Connector;
022: import org.mule.api.transport.MessageAdapter;
023: import org.mule.config.i18n.CoreMessages;
024: import org.mule.transport.AbstractMessageReceiver;
025: import org.mule.transport.ConnectException;
026: import org.mule.transport.udp.i18n.UdpMessages;
027:
028: import java.io.IOException;
029: import java.net.DatagramPacket;
030: import java.net.DatagramSocket;
031: import java.net.InetAddress;
032: import java.net.SocketAddress;
033: import java.net.SocketTimeoutException;
034: import java.net.URI;
035: import java.net.UnknownHostException;
036: import java.util.List;
037:
038: import javax.resource.spi.work.Work;
039: import javax.resource.spi.work.WorkException;
040: import javax.resource.spi.work.WorkManager;
041:
042: /** <code>UdpMessageReceiver</code> receives UDP message packets. */
043: public class UdpMessageReceiver extends AbstractMessageReceiver
044: implements Work {
045: protected DatagramSocket socket = null;
046: protected InetAddress inetAddress;
047: protected int bufferSize;
048: private URI uri;
049: protected List responseTransformers = null;
050:
051: public UdpMessageReceiver(Connector connector, Service service,
052: InboundEndpoint endpoint) throws CreateException {
053:
054: super (connector, service, endpoint);
055:
056: bufferSize = ((UdpConnector) connector).getReceiveBufferSize();
057:
058: uri = endpoint.getEndpointURI().getUri();
059:
060: try {
061: if (!"null".equalsIgnoreCase(uri.getHost())) {
062: inetAddress = InetAddress.getByName(uri.getHost());
063: }
064: } catch (UnknownHostException e) {
065: throw new CreateException(UdpMessages
066: .failedToLocateHost(uri), e, this );
067: }
068:
069: responseTransformers = getResponseTransformers();
070: }
071:
072: protected void doConnect() throws Exception {
073: try {
074: socket = ((UdpConnector) connector).getSocket(endpoint);
075: } catch (Exception e) {
076: throw new ConnectException(UdpMessages.failedToBind(uri),
077: e, this );
078: }
079:
080: try {
081: getWorkManager().scheduleWork(this , WorkManager.INDEFINITE,
082: null, connector);
083: } catch (WorkException e) {
084: throw new ConnectException(CoreMessages
085: .failedToScheduleWork(), e, this );
086: }
087: }
088:
089: protected void doDisconnect() throws Exception {
090: // this will cause the server thread to quit
091: disposing.set(true);
092: if (socket != null) {
093: socket.close();
094: }
095:
096: }
097:
098: protected void doStart() throws MuleException {
099: // nothing to do
100: }
101:
102: protected void doStop() throws MuleException {
103: // nothing to do
104: }
105:
106: protected List getResponseTransformers() {
107: List transformers = endpoint.getResponseTransformers();
108: if (transformers == null) {
109: return connector.getDefaultResponseTransformers();
110: }
111: return transformers;
112: }
113:
114: protected DatagramSocket createSocket(URI uri,
115: InetAddress inetAddress) throws IOException {
116: return new DatagramSocket(uri.getPort(), inetAddress);
117: }
118:
119: /** Obtain the serverSocket */
120: public DatagramSocket getSocket() {
121: return socket;
122: }
123:
124: protected DatagramPacket createPacket() {
125: DatagramPacket packet = new DatagramPacket(
126: new byte[bufferSize], bufferSize);
127: // if (uri.getPort() > 0)
128: // {
129: // packet.setPort(uri.getPort());
130: // }
131: // packet.setAddress(inetAddress);
132: return packet;
133: }
134:
135: public void run() {
136: while (!disposing.get()) {
137: if (connector.isStarted()) {
138:
139: try {
140: DatagramPacket packet = createPacket();
141: try {
142: if (logger.isDebugEnabled()) {
143: logger.debug("Receiving packet on " + uri);
144: }
145: socket.receive(packet);
146:
147: if (logger.isTraceEnabled()) {
148: logger.trace("Received packet on: " + uri);
149: }
150:
151: Work work = createWork(packet);
152: try {
153: getWorkManager().scheduleWork(work,
154: WorkManager.INDEFINITE, null,
155: connector);
156: } catch (WorkException e) {
157: logger.error("Udp receiver interrupted: "
158: + e.getMessage(), e);
159: }
160: } catch (SocketTimeoutException e) {
161: // ignore
162: }
163:
164: } catch (Exception e) {
165: if (!connector.isDisposed() && !disposing.get()) {
166: logger
167: .debug("Accept failed on socket: " + e,
168: e);
169: handleException(e);
170: }
171: }
172: }
173: }
174: }
175:
176: public void release() {
177: dispose();
178: }
179:
180: protected void doDispose() {
181: if (socket != null && !socket.isClosed()) {
182: logger.debug("Closing Udp connection: " + uri);
183: socket.close();
184: logger.info("Closed Udp connection: " + uri);
185: }
186: }
187:
188: protected Work createWork(DatagramPacket packet) throws IOException {
189: return new UdpWorker(new DatagramSocket(0), packet);
190: }
191:
192: protected class UdpWorker implements Work, Disposable {
193: private DatagramSocket socket = null;
194: private DatagramPacket packet;
195:
196: public UdpWorker(DatagramSocket socket, DatagramPacket packet) {
197: this .socket = socket;
198: this .packet = packet;
199: }
200:
201: public void release() {
202: dispose();
203: }
204:
205: public void dispose() {
206: if (socket != null && !socket.isClosed()) {
207: try {
208: socket.close();
209: } catch (Exception e) {
210: logger.error("Socket close failed", e);
211: }
212: }
213: socket = null;
214: }
215:
216: /** Accept requests from a given Udp address */
217: public void run() {
218: MuleMessage returnMessage = null;
219: try {
220: MessageAdapter adapter = connector
221: .getMessageAdapter(packet);
222: final SocketAddress clientAddress = socket
223: .getRemoteSocketAddress();
224: if (clientAddress != null) {
225: adapter.setProperty(
226: MuleProperties.MULE_REMOTE_CLIENT_ADDRESS,
227: clientAddress);
228: }
229: returnMessage = routeMessage(new DefaultMuleMessage(
230: adapter), endpoint.isSynchronous());
231:
232: if (returnMessage != null) {
233: byte[] data;
234: if (responseTransformers != null) {
235: returnMessage
236: .applyTransformers(responseTransformers);
237: Object response = returnMessage.getPayload();
238: if (response instanceof byte[]) {
239: data = (byte[]) response;
240: } else {
241: data = response.toString().getBytes();
242: }
243: } else {
244: data = returnMessage.getPayloadAsBytes();
245: }
246: DatagramPacket result = new DatagramPacket(data,
247: data.length, packet.getAddress(), packet
248: .getPort());
249: socket.send(result);
250: }
251: } catch (Exception e) {
252: if (!disposing.get()) {
253: handleException(e);
254: }
255: } finally {
256: dispose();
257: }
258: }
259: }
260: }
|