001: /**
002: * JOnAS: Java(TM) Open Application Server
003: * Copyright (C) 2004 Bull S.A.
004: * Contact: jonas-team@objectweb.org
005: *
006: * This library is free software; you can redistribute it and/or modify it
007: * under the terms of the GNU Lesser General Public License as published by the
008: * Free Software Foundation; either version 2.1 of the License, or any later
009: * version.
010: *
011: * This library is distributed in the hope that it will be useful, but WITHOUT
012: * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
013: * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
014: * for more details.
015: *
016: * You should have received a copy of the GNU Lesser General Public License
017: * along with this library; if not, write to the Free Software Foundation,
018: * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
019: *
020: */package org.objectweb.jonas.discovery;
021:
022: import java.io.IOException;
023: import java.net.DatagramPacket;
024: import java.net.InetAddress;
025: import java.net.MulticastSocket;
026: import java.net.SocketException;
027: import java.net.UnknownHostException;
028:
029: import org.objectweb.jonas.common.Log;
030: import org.objectweb.jonas.management.j2eemanagement.J2EEDomain;
031: import org.objectweb.jonas.management.monitoring.DomainMonitor;
032:
033: import org.objectweb.util.monolog.api.BasicLevel;
034: import org.objectweb.util.monolog.api.Logger;
035:
036: /**
037: * @author <a href="mailto:Takoua.Abdellatif@inria.fr">Takoua Abdellatif </a>
038: * @version 1.0
039: */
040: public class DiscoveryListener implements Runnable {
041: /**
042: * Management notification type for <i>discovery</i> events
043: */
044: public static final String DISCOVERY_TYPE = "jonas.management.discovery";
045:
046: private static int RECEIVE_BUFFER_SIZE = 1024;
047: /**
048: * My enroller's listening port
049: */
050: private int port;
051: /**
052: * My enroller's multicast IP address
053: */
054: private InetAddress groupAddress;
055: /**
056: * Time-to-live for multicatst packets
057: */
058: private int ttl = 1; // why 1 ??
059: /**
060: * Used to receive multicasted discovery events
061: */
062: private MulticastSocket multicastSocket;
063: /**
064: * Socket state
065: */
066: private boolean notStopped = true;
067: private static Logger logger = Log
068: .getLogger(Log.JONAS_DISCOVERY_PREFIX);
069:
070: /**
071: * Constructs a DiscoveryListener associated to the Enroller
072: * @param enroller Enroller to which this thread is associated
073: */
074: public DiscoveryListener(Enroller enroller) {
075: this .port = enroller.getListeningPort();
076: try {
077: this .groupAddress = InetAddress.getByName(enroller
078: .getListeningIp());
079: this .ttl = enroller.getTimeToLive();
080: } catch (UnknownHostException e) {
081: logger.log(BasicLevel.ERROR, e);
082: }
083: }
084:
085: /**
086: * Creates a MulticastSocket and joins the group of multicas host
087: * identified by the InetAddress <code>groupAddress</code>
088: */
089: private void join() {
090: try {
091: multicastSocket = new MulticastSocket(port);
092: multicastSocket.setTimeToLive(ttl);
093: multicastSocket.joinGroup(groupAddress);
094: if (logger.isLoggable(BasicLevel.DEBUG)) {
095: logger.log(BasicLevel.DEBUG, "multicast ip address is "
096: + groupAddress);
097: logger.log(BasicLevel.DEBUG, "multicast port is "
098: + port);
099: }
100: } catch (IOException e) {
101: logger.log(BasicLevel.ERROR, "io problem");
102: // TODO Auto-generated catch block
103: e.printStackTrace();
104: }
105: }
106:
107: private DatagramPacket getDatagram(int length) {
108: return new DatagramPacket(new byte[length], length);
109: }
110:
111: public void run() {
112: // Join the group in order to receive multicast messages
113: join();
114: try {
115: while (notStopped) {
116: DatagramPacket datagram = getDatagram(RECEIVE_BUFFER_SIZE);
117: multicastSocket.receive(datagram);
118: Object objReceived = DiscoveryHelper
119: .bytesToObject(datagram.getData());
120: if (objReceived != null) {
121: if (objReceived instanceof DiscEvent) {
122: // Treat DiscEvents
123: DiscEvent event = (DiscEvent) objReceived;
124: // Trust the datagram packet for source address instead of the message.
125: event.setSourceAddress(datagram.getAddress()
126: .getHostAddress());
127: DomainMonitor dm = J2EEDomain.getInstance()
128: .getDomainMonitor();
129: if (dm == null) {
130: logger.log(BasicLevel.WARN,
131: "No DomainMonitor");
132: } else {
133: dm.discoveryNotification(event);
134: }
135: }
136: }
137: datagram = null;
138: }
139: } catch (SocketException e) {
140: logger.log(BasicLevel.ERROR, "Enroller: Socket closed" + e);
141: notStopped = false;
142: } catch (IOException e1) {
143: e1.printStackTrace();
144: } catch (ClassNotFoundException e) {
145: e.printStackTrace();
146: }
147: }
148:
149: public void stopListener() {
150: notStopped = false;
151: }
152:
153: }
|