001: /**
002: * JOnAS: Java(TM) Open Application Server
003: * Copyright (C) 2004 Bull S.A.
004: * Contact: jonas-team@objectweb.org
005: *
006: * This library is free software; you can redistribute it and/or modify it
007: * under the terms of the GNU Lesser General Public License as published by the
008: * Free Software Foundation; either version 2.1 of the License, or any later
009: * version.
010: *
011: * This library is distributed in the hope that it will be useful, but WITHOUT
012: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
013: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
014: * for more details.
015: *
016: * You should have received a copy of the GNU Lesser General Public License
017: * along with this library; if not, write to the Free Software Foundation,
018: * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
019: * --------------------------------------------------------------------------
020: * $Id: DiscoveryComm.java 9358 2006-08-04 11:46:51Z durieuxp $
021: * --------------------------------------------------------------------------
022: */package org.objectweb.jonas.discovery;
023:
024: import java.io.IOException;
025: import java.net.DatagramPacket;
026: import java.net.DatagramSocket;
027: import java.net.InetAddress;
028: import java.net.MulticastSocket;
029: import java.net.SocketException;
030: import java.net.UnknownHostException;
031:
032: import org.objectweb.jonas.common.Log;
033: import org.objectweb.jonas.common.NetUtils;
034: import org.objectweb.util.monolog.api.BasicLevel;
035: import org.objectweb.util.monolog.api.Logger;
036:
037: /**
038: * @author <a href="mailto:Takoua.Abdellatif@inria.fr">Takoua Abdellatif </a>
039: * @version 1.0
040: */
041: public class DiscoveryComm implements Runnable {
042: /**
043: * Size of buffer to read incoming packets into.
044: */
045: public static final int RECEIVE_BUFFER_SIZE = 1024;
046: /**
047: * Used to multicast a discovery event on run() execution
048: */
049: protected MulticastSocket multicastSocket;
050: /**
051: * Uset to send a discovery event as response to a discovey message
052: */
053: protected DatagramSocket unicastSocket;
054:
055: /**
056: * My manager's listening port
057: */
058: private int port;
059: /**
060: * My manager's multicast IP address
061: */
062: private InetAddress destAddress;
063: /**
064: * Set to false if the thread is stopped
065: */
066: protected boolean notStopped = true;
067: /**
068: * Time to live for multicatst packets
069: */
070: private int ttl = 1; // why 1 ??
071:
072: /**
073: * Name for this jonas instance.
074: */
075: protected String jonasName = null;
076: /**
077: * Domain name that this instance belongs to.
078: */
079: protected String domainName = null;
080:
081: /**
082: * The server ID of the jonas instance.
083: */
084: protected String serverId = null;
085:
086: /**
087: * MBean server connection URLs for this server.
088: */
089: protected String[] urls = null;
090:
091: /**
092: * logger
093: */
094: private static Logger logger = Log
095: .getLogger(Log.JONAS_DISCOVERY_PREFIX);
096:
097: /**
098: * Constructs a DiscoveryComm associated to the DiscoveryManager
099: * @param dm DiscoveryManager to which this thread is associated
100: */
101: public DiscoveryComm(DiscoveryManager dm) {
102: this .port = dm.getListeningPort();
103: try {
104: this .destAddress = InetAddress.getByName(dm
105: .getListeningIp());
106: this .ttl = dm.getTimeToLive();
107: this .jonasName = dm.getJonasName();
108: this .domainName = dm.getDomainName();
109: this .urls = dm.getUrls();
110: this .serverId = dm.getServerId();
111: } catch (UnknownHostException e) {
112: logger.log(BasicLevel.ERROR, "Unknown Host", e);
113: }
114: }
115:
116: /**
117: * Creates a MulticastSocket and joins the group of multicas host
118: * identified by the InetAddress <code>destAddress</code>
119: *
120: */
121: protected void join() {
122: try {
123: multicastSocket = new MulticastSocket(port);
124: multicastSocket.setTimeToLive(ttl);
125: multicastSocket.joinGroup(destAddress);
126: if (logger.isLoggable(BasicLevel.DEBUG)) {
127: logger.log(BasicLevel.DEBUG, "multicast ip address is "
128: + destAddress);
129: logger.log(BasicLevel.DEBUG, "multicast port is "
130: + port);
131: }
132: } catch (IOException e) {
133: logger.log(BasicLevel.ERROR, "io problem", e);
134: }
135: }
136:
137: /**
138: * sends (multicasts) a Discovery Message to the group.
139: * @param msg The message to send.
140: */
141: public void sendNotif(DiscMessage msg) {
142: try {
143: //send it on the multicast address
144: //after transforming the object to a datagram
145: if (logger.isLoggable(BasicLevel.DEBUG)) {
146: logger.log(BasicLevel.DEBUG, msg);
147: }
148: byte[] messageBytes = DiscoveryHelper.objectToBytes(msg);
149: multicastSocket.send(new DatagramPacket(messageBytes,
150: messageBytes.length, destAddress, port));
151: } catch (IOException e1) {
152: logger.log(BasicLevel.ERROR,
153: "DiscoveryComm: Error to send notification", e1);
154: }
155:
156: }
157:
158: /**
159: * Send response to a DiscoveryMessage
160: * @param msg Containes a DiscoveryMessage allowing to inform about the responder
161: * (name, state, URLs)
162: * @param destAddress the destination address picked up from the request
163: * @param port the destination port picked up from the request
164: */
165: protected void sendResponse(DiscMessage msg,
166: InetAddress destAddress, int port) {
167: if (logger.isLoggable(BasicLevel.DEBUG)) {
168: logger.log(BasicLevel.DEBUG,
169: "DiscoveryComm : The message to send is " + msg
170: + "Sending it to: " + destAddress
171: + " and port is: " + port);
172: }
173: try {
174: byte[] messageBytes = DiscoveryHelper.objectToBytes(msg);
175: if (messageBytes != null) {
176: // send the unicast response to the discovery client
177: unicastSocket.send(new DatagramPacket(messageBytes,
178: messageBytes.length, destAddress, port));
179: }
180: } catch (IOException e) {
181: logger
182: .log(
183: BasicLevel.ERROR,
184: "DiscoveryComm: Error to send response to discovery message",
185: e);
186: }
187: }
188:
189: /**
190: * Create a discovery event to notify about a state change of the
191: * event sender
192: * @param state
193: * - RUNNING if the sender notifies that it gets running
194: * - STOPPING if the sender notifies that it stops running
195: * @return a Discovery event (notification)
196: * @throws Exception
197: * is thrown if the jmx service is not reached.
198: */
199: public DiscEvent createNotifMessage(String state) throws Exception {
200: String theHostAddress;
201: try {
202: theHostAddress = NetUtils.getLocalAddress();
203: } catch (UnknownHostException e) {
204: logger.log(BasicLevel.ERROR, "Unknown host", e);
205: return null;
206: }
207:
208: if (!state.equals(DiscoveryState.RUNNING)) {
209: urls = null;
210: }
211: // In the case of a notification, the field port is not important since the
212: // notifier is not waiting for an acknowledfement.
213: DiscEvent resp = new DiscEvent(theHostAddress, port, jonasName,
214: domainName, serverId, urls);
215: resp.setState(state);
216: return resp;
217: }
218:
219: /**
220: * Construct a new datagram.
221: * @param length packets length to be received
222: * @return the created datagram
223: */
224: protected DatagramPacket getDatagram(int length) {
225: return new DatagramPacket(new byte[length], length);
226: }
227:
228: /**
229: *
230: * @see java.lang.Runnable#run()
231: */
232: public void run() {
233: // Join the group in order to receive multicast messages
234: join();
235: // Create notification message containing a discovery event with state RUNNING
236: DiscEvent discEventMsg = null;
237: try {
238: discEventMsg = createNotifMessage(DiscoveryState.RUNNING);
239: } catch (Exception e) {
240: logger
241: .log(
242: BasicLevel.ERROR,
243: "DiscoveryComm: Unable to create a notification message",
244: e);
245: }
246: if (discEventMsg != null) {
247: // Multicast the message
248: sendNotif(discEventMsg);
249: }
250: // Create the socket to be used for responding
251: try {
252: unicastSocket = new DatagramSocket();
253: } catch (SocketException e3) {
254: logger.log(BasicLevel.ERROR, "Socket exception", e3);
255: return;
256: }
257: try {
258: while (notStopped) {
259: DatagramPacket datagram = getDatagram(RECEIVE_BUFFER_SIZE);
260: multicastSocket.receive(datagram);
261: Object objReceived = DiscoveryHelper
262: .bytesToObject(datagram.getData());
263: if (objReceived != null) {
264: // The DiscEvents are ignored
265: if ((objReceived instanceof DiscEvent)
266: || (objReceived instanceof DiscGreeting)) {
267: if (logger.isLoggable(BasicLevel.DEBUG)) {
268: logger.log(BasicLevel.DEBUG,
269: "This discovery event/greeting is ignored "
270: + objReceived);
271: }
272: } else {
273: DiscMessage request = (DiscMessage) objReceived;
274: if (logger.isLoggable(BasicLevel.DEBUG)) {
275: logger.log(BasicLevel.DEBUG,
276: "A discovery message is received "
277: + objReceived);
278: }
279: if (discEventMsg != null) {
280: // Use the address in the datagram packet instead of trusting the message.
281: InetAddress destAddress = datagram
282: .getAddress();
283: int destPort = request.getSourcePort();
284: sendResponse(discEventMsg, destAddress,
285: destPort);
286: }
287: }
288: }
289: datagram = null;
290: }
291: } catch (SocketException e) {
292: logger.log(BasicLevel.ERROR, "Socket closed: ", e);
293: notStopped = false;
294: } catch (IOException e1) {
295: logger.log(BasicLevel.ERROR, e1);
296: } catch (ClassNotFoundException e) {
297: logger.log(BasicLevel.ERROR, e);
298: }
299: }
300:
301: /**
302: * sends a notification message to notify that the server is stopping.
303: *
304: */
305: public void stop() {
306: // send a notification message of type STOPPING
307: DiscEvent msg = null;
308: try {
309: if (logger.isLoggable(BasicLevel.DEBUG)) {
310: logger.log(BasicLevel.DEBUG,
311: "Sending a STOPPING DiscEvent.");
312: }
313: msg = createNotifMessage(DiscoveryState.STOPPING);
314: } catch (Exception e) {
315: logger.log(BasicLevel.ERROR, e);
316: }
317: if (msg != null) {
318: sendNotif(msg);
319: }
320: Thread.interrupted();
321: }
322:
323: /**
324: * @param jonasName The jonasName to set.
325: */
326: protected void setJonasName(String jonasName) {
327: this .jonasName = jonasName;
328: }
329:
330: /**
331: * @param domainName The domainName to set.
332: */
333: protected void setDomainName(String domainName) {
334: this .domainName = domainName;
335: }
336:
337: /**
338: * @param urls The urls to set.
339: */
340: protected void setUrls(String[] urls) {
341: this .urls = urls;
342: }
343:
344: public String getServerId() {
345: return serverId;
346: }
347:
348: public void setServerId(String serverId) {
349: this.serverId = serverId;
350: }
351: }
|