001: /**
002: * JOnAS: Java(TM) Open Application Server
003: * Copyright (C) 2004 Bull S.A.
004: * Contact: jonas-team@objectweb.org
005: *
006: * This library is free software; you can redistribute it and/or modify it
007: * under the terms of the GNU Lesser General Public License as published by the
008: * Free Software Foundation; either version 2.1 of the License, or any later
009: * version.
010: *
011: * This library is distributed in the hope that it will be useful, but WITHOUT
012: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
013: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
014: * for more details.
015: *
016: * You should have received a copy of the GNU Lesser General Public License
017: * along with this library; if not, write to the Free Software Foundation,
018: * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
019: *
020: */package org.objectweb.jonas.discovery;
021:
022: import java.io.IOException;
023: import java.net.DatagramPacket;
024: import java.net.DatagramSocket;
025: import java.net.InetAddress;
026: import java.net.MulticastSocket;
027: import java.net.SocketException;
028: import java.net.UnknownHostException;
029:
030: import org.objectweb.jonas.common.Log;
031: import org.objectweb.jonas.jmx.JonasObjectName;
032: import org.objectweb.jonas.management.j2eemanagement.J2EEDomain;
033: import org.objectweb.jonas.management.monitoring.DomainMonitor;
034: import org.objectweb.jonas.service.ServiceException;
035:
036: import org.objectweb.util.monolog.api.BasicLevel;
037: import org.objectweb.util.monolog.api.Logger;
038:
039: /**
040: * @author <a href="mailto:Takoua.Abdellatif@inria.fr">Takoua Abdellatif </a>
041: * @author Adriana Danes
042: * @version 1.0
043: */
044: public class DiscoveryClientListener implements Runnable {
045:
046: /**
047: * Management notification type for <i>discovery</i> events
048: */
049: public static final String DISCOVERY_TYPE = "jonas.management.discovery";
050:
051: private static int RECEIVE_BUFFER_SIZE = 1024;
052:
053: /**
054: * Used to multicast a discovery message on run() execution
055: */
056: private MulticastSocket multicastSocket;
057:
058: /**
059: * Port associated to the multicast socket
060: */
061: private int port;
062:
063: /**
064: * IP address for the multicast socket
065: */
066: private InetAddress groupAddress;
067:
068: /**
069: * Time to live for multicatst packets
070: */
071: private int ttl;
072:
073: /**
074: * Uset to receive a discovery event as response to the sent discovey message
075: */
076: private DatagramSocket unicastSocket;
077:
078: /**
079: * Ip address which identify the sender of the discovery message
080: */
081: private String sourceIp;
082:
083: /**
084: * Port which identify by the sender of the discovery message
085: */
086: private int sourcePort;
087:
088: /**
089: * State information
090: */
091: private boolean notStopped = true;
092:
093: /**
094: * Time to wait for a response
095: */
096: private long timeout = 1000;
097:
098: /**
099: * Logger
100: */
101: private static Logger logger = Log
102: .getLogger(Log.JONAS_DISCOVERY_PREFIX);
103:
104: /**
105: * Constructs a DiscoveryClientListener associated with a DiscoveryClient
106: * @param discoveryClient DiscoveryClient to which this thread is associated
107: */
108: public DiscoveryClientListener(DiscoveryClient discoveryClient) {
109: this .port = discoveryClient.getListeningPort();
110: try {
111: this .groupAddress = InetAddress.getByName(discoveryClient
112: .getListeningIp());
113: this .ttl = discoveryClient.getTimeToLive();
114: } catch (UnknownHostException e) {
115: logger.log(BasicLevel.ERROR, "Invalid host", e);
116: }
117: this .timeout = discoveryClient.getTimeout();
118: this .sourcePort = discoveryClient.getSourcePort();
119: this .sourceIp = discoveryClient.getSourceIp();
120:
121: // Create a unicast socket to receive responses
122: // Do this in constructor so the exception can be caught
123: // and acted on before a new thread is created.
124: try {
125: unicastSocket = new DatagramSocket(sourcePort);
126: } catch (SocketException e2) {
127: logger
128: .log(
129: BasicLevel.ERROR,
130: "DiscoveryClient : Unable to create a Datagram socket",
131: e2);
132: // Could not create datagram socket, so throw a ServiceException.
133: throw new ServiceException(
134: "Could not create socket to listen for discovery "
135: + "messages at port: " + sourcePort
136: + ". The port might be in use.");
137: }
138: }
139:
140: /**
141: * Sends a discovery message to the server group.
142: */
143: public void sendDiscoveryMessage(DiscMessage msg) {
144: if (logger.isLoggable(BasicLevel.DEBUG)) {
145: logger.log(BasicLevel.DEBUG,
146: "DiscoveryClient : The message to send is " + msg);
147: }
148: // send the message on the multicast socket
149: // after packing it into a datagram
150: byte[] messageBytes = null;
151: try {
152: messageBytes = DiscoveryHelper.objectToBytes(msg);
153: if (messageBytes != null) {
154: multicastSocket.send(new DatagramPacket(messageBytes,
155: messageBytes.length, groupAddress, port));
156: }
157: } catch (IOException e) {
158: logger
159: .log(
160: BasicLevel.ERROR,
161: "DiscoveryClient : Error to send discovery message",
162: e);
163: }
164: }
165:
166: /**
167: * @see java.lang.Runnable#run()
168: */
169: public void run() {
170: // Create a multicast socket
171: try {
172: multicastSocket = new MulticastSocket(port);
173: multicastSocket.setTimeToLive(ttl);
174: } catch (IOException e) {
175: // TODO Auto-generated catch block
176: e.printStackTrace();
177: }
178:
179: // Prepare a discovery message
180: DiscMessage msg = new DiscMessage(sourceIp, sourcePort);
181: // First send a discovery message
182: sendDiscoveryMessage(msg);
183: if (logger.isLoggable(BasicLevel.DEBUG)) {
184: logger.log(BasicLevel.DEBUG,
185: " DiscoveryClient: Sent Message is" + msg);
186: }
187:
188: // wait for responses during a Timeout period on the unicast socket.
189: long lastTime = timeout + System.currentTimeMillis();
190: DiscEvent event = null;
191: try {
192: while ((notStopped)
193: && System.currentTimeMillis() <= lastTime) {
194: DatagramPacket datagram = getDatagram(RECEIVE_BUFFER_SIZE);
195: unicastSocket.receive(datagram);
196: Object objReceived = DiscoveryHelper
197: .bytesToObject(datagram.getData());
198: if (objReceived != null) {
199: if (objReceived instanceof DiscEvent) {
200: event = (DiscEvent) objReceived;
201: if (event.getDomainName().equals(
202: JonasObjectName.getDomain())) {
203: // Trust the datagram packet instead of the encoded URL
204: event.setSourceAddress(datagram
205: .getAddress().getHostAddress());
206: DomainMonitor dm = J2EEDomain.getInstance()
207: .getDomainMonitor();
208: if (dm == null) {
209: logger.log(BasicLevel.WARN,
210: "No DomainMonitor");
211: } else {
212: dm.discoveryNotification(event);
213: }
214: }
215: }
216: }
217: datagram = null;
218: }
219: } catch (SocketException e) {
220: logger.log(BasicLevel.ERROR,
221: "DiscoveryClient : Socket closed", e);
222: notStopped = false;
223: } catch (IOException e1) {
224: logger.log(BasicLevel.ERROR, "DiscoveryClient IOException",
225: e1);
226: } catch (ClassNotFoundException e) {
227: logger.log(BasicLevel.ERROR,
228: "DiscoveryClient ClassNotFoundException ", e);
229: }
230: }
231:
232: protected DatagramPacket getDatagram(int length) {
233: return new DatagramPacket(new byte[length], length);
234: }
235:
236: /**
237: * Stops the current thread
238: */
239: public void stop() {
240: notStopped = false;
241: Thread.interrupted();
242: }
243: }
|