0001: package org.jgroups.protocols;
0002:
0003: import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
0004: import org.jgroups.*;
0005: import org.jgroups.stack.Protocol;
0006: import org.jgroups.stack.IpAddress;
0007: import org.jgroups.util.*;
0008: import org.jgroups.util.List;
0009: import org.jgroups.util.Queue;
0010:
0011: import java.io.DataInputStream;
0012: import java.io.IOException;
0013: import java.net.*;
0014: import java.text.NumberFormat;
0015: import java.util.*;
0016:
0017: /**
0018: * Generic transport - specific implementations should extend this abstract class.
0019: * Features which are provided to the subclasses include
0020: * <ul>
0021: * <li>version checking
0022: * <li>marshalling and unmarshalling
0023: * <li>message bundling (handling single messages, and message lists)
0024: * <li>incoming packet handler
0025: * <li>loopback
0026: * </ul>
0027: * A subclass has to override
0028: * <ul>
0029: * <li>{@link #sendToAllMembers(byte[], int, int)}
0030: * <li>{@link #sendToSingleMember(org.jgroups.Address, byte[], int, int)}
0031: * <li>{@link #init()}
0032: * <li>{@link #start()}: subclasses <em>must</em> call super.start() <em>after</em> they initialize themselves
0033: * (e.g., created their sockets).
0034: * <li>{@link #stop()}: subclasses <em>must</em> call super.stop() after they deinitialized themselves
0035: * <li>{@link #destroy()}
0036: * </ul>
0037: * The create() or start() method has to create a local address.<br>
0038: * The {@link #receive(Address, Address, byte[], int, int)} method must
0039: * be called by subclasses when a unicast or multicast message has been received.
0040: * @author Bela Ban
0041: * @version $Id: TP.java,v 1.77.2.3 2007/04/27 08:03:51 belaban Exp $
0042: */
0043: public abstract class TP extends Protocol {
0044:
0045: /** The address (host and port) of this member */
0046: protected Address local_addr = null;
0047:
0048: /** The name of the group to which this member is connected */
0049: protected String channel_name = null;
0050:
0051: /** The interface (NIC) which should be used by this transport */
0052: protected InetAddress bind_addr = null;
0053:
0054: /** Overrides bind_addr, -Djgroups.bind_addr and -Dbind.address: let's the OS return the local host address */
0055: boolean use_local_host = false;
0056:
0057: /** If true, the transport should use all available interfaces to receive multicast messages
0058: * @deprecated Use {@link receive_on_all_interfaces} instead */
0059: boolean bind_to_all_interfaces = false;
0060:
0061: /** If true, the transport should use all available interfaces to receive multicast messages */
0062: boolean receive_on_all_interfaces = false;
0063:
0064: /** List<NetworkInterface> of interfaces to receive multicasts on. The multicast receive socket will listen
0065: * on all of these interfaces. This is a comma-separated list of IP addresses or interface names. E.g.
0066: * "192.168.5.1,eth1,127.0.0.1". Duplicates are discarded; we only bind to an interface once.
0067: * If this property is set, it override receive_on_all_interfaces.
0068: */
0069: java.util.List receive_interfaces = null;
0070:
0071: /** If true, the transport should use all available interfaces to send multicast messages. This means
0072: * the same multicast message is sent N times, so use with care */
0073: boolean send_on_all_interfaces = false;
0074:
0075: /** List<NetworkInterface> of interfaces to send multicasts on. The multicast send socket will send the
0076: * same multicast message on all of these interfaces. This is a comma-separated list of IP addresses or
0077: * interface names. E.g. "192.168.5.1,eth1,127.0.0.1". Duplicates are discarded.
0078: * If this property is set, it override send_on_all_interfaces.
0079: */
0080: java.util.List send_interfaces = null;
0081:
0082: /** The port to which the transport binds. 0 means to bind to any (ephemeral) port */
0083: int bind_port = 0;
0084: int port_range = 1; // 27-6-2003 bgooren, Only try one port by default
0085:
0086: /** The members of this group (updated when a member joins or leaves) */
0087: final protected Vector members = new Vector(11);
0088:
0089: protected View view = null;
0090:
0091: /** Pre-allocated byte stream. Used for marshalling messages. Will grow as needed */
0092: final ExposedByteArrayOutputStream out_stream = new ExposedByteArrayOutputStream(
0093: 1024);
0094: final ExposedBufferedOutputStream buf_out_stream = new ExposedBufferedOutputStream(
0095: out_stream, 1024);
0096: final ExposedDataOutputStream dos = new ExposedDataOutputStream(
0097: buf_out_stream);
0098:
0099: final ExposedByteArrayInputStream in_stream = new ExposedByteArrayInputStream(
0100: new byte[] { '0' });
0101: final ExposedBufferedInputStream buf_in_stream = new ExposedBufferedInputStream(
0102: in_stream);
0103: final DataInputStream dis = new DataInputStream(buf_in_stream);
0104:
0105: /** If true, messages sent to self are treated specially: unicast messages are
0106: * looped back immediately, multicast messages get a local copy first and -
0107: * when the real copy arrives - it will be discarded. Useful for Window
0108: * media (non)sense */
0109: boolean loopback = false;
0110:
0111: /** Discard packets with a different version. Usually minor version differences are okay. Setting this property
0112: * to true means that we expect the exact same version on all incoming packets */
0113: boolean discard_incompatible_packets = false;
0114:
0115: /** Sometimes receivers are overloaded (they have to handle de-serialization etc).
0116: * Packet handler is a separate thread taking care of de-serialization, receiver
0117: * thread(s) simply put packet in queue and return immediately. Setting this to
0118: * true adds one more thread */
0119: boolean use_incoming_packet_handler = true;
0120:
0121: /** Used by packet handler to store incoming DatagramPackets */
0122: Queue incoming_packet_queue = null;
0123:
0124: /** Dequeues DatagramPackets from packet_queue, unmarshalls them and
0125: * calls <tt>handleIncomingUdpPacket()</tt> */
0126: IncomingPacketHandler incoming_packet_handler = null;
0127:
0128: /** Used by packet handler to store incoming Messages */
0129: Queue incoming_msg_queue = null;
0130:
0131: IncomingMessageHandler incoming_msg_handler;
0132:
0133: /** Packets to be sent are stored in outgoing_queue and sent by a separate thread. Enabling this
0134: * value uses an additional thread */
0135: boolean use_outgoing_packet_handler = false;
0136:
0137: /** Used by packet handler to store outgoing DatagramPackets */
0138: BoundedLinkedQueue outgoing_queue = null;
0139:
0140: /** max number of elements in the bounded outgoing_queue */
0141: int outgoing_queue_max_size = 2000;
0142:
0143: OutgoingPacketHandler outgoing_packet_handler = null;
0144:
0145: /** If set it will be added to <tt>local_addr</tt>. Used to implement
0146: * for example transport independent addresses */
0147: byte[] additional_data = null;
0148:
0149: /** Maximum number of bytes for messages to be queued until they are sent. This value needs to be smaller
0150: than the largest datagram packet size in case of UDP */
0151: int max_bundle_size = AUTOCONF.senseMaxFragSizeStatic();
0152:
0153: /** Max number of milliseconds until queued messages are sent. Messages are sent when max_bundle_size or
0154: * max_bundle_timeout has been exceeded (whichever occurs faster)
0155: */
0156: long max_bundle_timeout = 20;
0157:
0158: /** Enabled bundling of smaller messages into bigger ones */
0159: boolean enable_bundling = false;
0160:
0161: private Bundler bundler = null;
0162:
0163: protected TimeScheduler timer = null;
0164:
0165: private DiagnosticsHandler diag_handler = null;
0166: boolean enable_diagnostics = true;
0167: String diagnostics_addr = "224.0.0.75";
0168: int diagnostics_port = 7500;
0169:
0170: /** HashMap<Address, Address>. Keys=senders, values=destinations. For each incoming message M with sender S, adds
0171: * an entry with key=S and value= sender's IP address and port.
0172: */
0173: HashMap addr_translation_table = new HashMap();
0174:
0175: boolean use_addr_translation = false;
0176:
0177: TpHeader header;
0178:
0179: final String name = getName();
0180:
0181: static final byte LIST = 1; // we have a list of messages rather than a single message when set
0182: static final byte MULTICAST = 2; // message is a multicast (versus a unicast) message when set
0183:
0184: long num_msgs_sent = 0, num_msgs_received = 0, num_bytes_sent = 0,
0185: num_bytes_received = 0;
0186:
0187: static NumberFormat f;
0188:
0189: static {
0190: f = NumberFormat.getNumberInstance();
0191: f.setGroupingUsed(false);
0192: f.setMaximumFractionDigits(2);
0193: }
0194:
0195: /**
0196: * Creates the TP protocol, and initializes the
0197: * state variables, does however not start any sockets or threads.
0198: */
0199: protected TP() {
0200: }
0201:
0202: /**
0203: * debug only
0204: */
0205: public String toString() {
0206: return name + "(local address: " + local_addr + ')';
0207: }
0208:
0209: public void resetStats() {
0210: num_msgs_sent = num_msgs_received = num_bytes_sent = num_bytes_received = 0;
0211: }
0212:
0213: public long getNumMessagesSent() {
0214: return num_msgs_sent;
0215: }
0216:
0217: public long getNumMessagesReceived() {
0218: return num_msgs_received;
0219: }
0220:
0221: public long getNumBytesSent() {
0222: return num_bytes_sent;
0223: }
0224:
0225: public long getNumBytesReceived() {
0226: return num_bytes_received;
0227: }
0228:
0229: public String getBindAddress() {
0230: return bind_addr != null ? bind_addr.toString() : "null";
0231: }
0232:
0233: public void setBindAddress(String bind_addr)
0234: throws UnknownHostException {
0235: this .bind_addr = InetAddress.getByName(bind_addr);
0236: }
0237:
0238: /** @deprecated Use {@link #isReceiveOnAllInterfaces()} instead */
0239: public boolean getBindToAllInterfaces() {
0240: return receive_on_all_interfaces;
0241: }
0242:
0243: public void setBindToAllInterfaces(boolean flag) {
0244: this .receive_on_all_interfaces = flag;
0245: }
0246:
0247: public boolean isReceiveOnAllInterfaces() {
0248: return receive_on_all_interfaces;
0249: }
0250:
0251: public java.util.List getReceiveInterfaces() {
0252: return receive_interfaces;
0253: }
0254:
0255: public boolean isSendOnAllInterfaces() {
0256: return send_on_all_interfaces;
0257: }
0258:
0259: public java.util.List getSendInterfaces() {
0260: return send_interfaces;
0261: }
0262:
0263: public boolean isDiscardIncompatiblePackets() {
0264: return discard_incompatible_packets;
0265: }
0266:
0267: public void setDiscardIncompatiblePackets(boolean flag) {
0268: discard_incompatible_packets = flag;
0269: }
0270:
0271: public boolean isEnableBundling() {
0272: return enable_bundling;
0273: }
0274:
0275: public void setEnableBundling(boolean flag) {
0276: enable_bundling = flag;
0277: }
0278:
0279: public int getMaxBundleSize() {
0280: return max_bundle_size;
0281: }
0282:
0283: public void setMaxBundleSize(int size) {
0284: max_bundle_size = size;
0285: }
0286:
0287: public long getMaxBundleTimeout() {
0288: return max_bundle_timeout;
0289: }
0290:
0291: public void setMaxBundleTimeout(long timeout) {
0292: max_bundle_timeout = timeout;
0293: }
0294:
0295: public int getOutgoingQueueSize() {
0296: return outgoing_queue != null ? outgoing_queue.size() : 0;
0297: }
0298:
0299: public int getIncomingQueueSize() {
0300: return incoming_packet_queue != null ? incoming_packet_queue
0301: .size() : 0;
0302: }
0303:
0304: public Address getLocalAddress() {
0305: return local_addr;
0306: }
0307:
0308: public String getChannelName() {
0309: return channel_name;
0310: }
0311:
0312: public boolean isLoopback() {
0313: return loopback;
0314: }
0315:
0316: public void setLoopback(boolean b) {
0317: loopback = b;
0318: }
0319:
0320: public boolean isUseIncomingPacketHandler() {
0321: return use_incoming_packet_handler;
0322: }
0323:
0324: public boolean isUseOutgoingPacketHandler() {
0325: return use_outgoing_packet_handler;
0326: }
0327:
0328: public int getOutgoingQueueMaxSize() {
0329: return outgoing_queue != null ? outgoing_queue_max_size : 0;
0330: }
0331:
0332: public void setOutgoingQueueMaxSize(int new_size) {
0333: if (outgoing_queue != null) {
0334: outgoing_queue.setCapacity(new_size);
0335: outgoing_queue_max_size = new_size;
0336: }
0337: }
0338:
0339: public Map dumpStats() {
0340: Map retval = super .dumpStats();
0341: if (retval == null)
0342: retval = new HashMap();
0343: retval.put("num_msgs_sent", new Long(num_msgs_sent));
0344: retval.put("num_msgs_received", new Long(num_msgs_received));
0345: retval.put("num_bytes_sent", new Long(num_bytes_sent));
0346: retval.put("num_bytes_received", new Long(num_bytes_received));
0347: return retval;
0348: }
0349:
0350: /**
0351: * Send to all members in the group. UDP would use an IP multicast message, whereas TCP would send N
0352: * messages, one for each member
0353: * @param data The data to be sent. This is not a copy, so don't modify it
0354: * @param offset
0355: * @param length
0356: * @throws Exception
0357: */
0358: public abstract void sendToAllMembers(byte[] data, int offset,
0359: int length) throws Exception;
0360:
0361: /**
0362: * Send to all members in the group. UDP would use an IP multicast message, whereas TCP would send N
0363: * messages, one for each member
0364: * @param dest Must be a non-null unicast address
0365: * @param data The data to be sent. This is not a copy, so don't modify it
0366: * @param offset
0367: * @param length
0368: * @throws Exception
0369: */
0370: public abstract void sendToSingleMember(Address dest, byte[] data,
0371: int offset, int length) throws Exception;
0372:
0373: public abstract String getInfo();
0374:
0375: public abstract void postUnmarshalling(Message msg, Address dest,
0376: Address src, boolean multicast);
0377:
0378: public abstract void postUnmarshallingList(Message msg,
0379: Address dest, boolean multicast);
0380:
0381: private StringBuffer _getInfo() {
0382: StringBuffer sb = new StringBuffer();
0383: sb.append(local_addr).append(" (").append(channel_name).append(
0384: ") ").append("\n");
0385: sb.append("local_addr=").append(local_addr).append("\n");
0386: sb.append("group_name=").append(channel_name).append("\n");
0387: sb.append("version=").append(Version.description).append(
0388: ", cvs=\"").append(Version.cvs).append("\"\n");
0389: sb.append("view: ").append(view).append('\n');
0390: sb.append(getInfo());
0391: return sb;
0392: }
0393:
0394: private void handleDiagnosticProbe(SocketAddress sender,
0395: DatagramSocket sock, String request) {
0396: try {
0397: StringTokenizer tok = new StringTokenizer(request);
0398: String req = tok.nextToken();
0399: StringBuffer info = new StringBuffer("n/a");
0400: if (req.trim().toLowerCase().startsWith("query")) {
0401: ArrayList l = new ArrayList(tok.countTokens());
0402: while (tok.hasMoreTokens())
0403: l.add(tok.nextToken().trim().toLowerCase());
0404:
0405: info = _getInfo();
0406:
0407: if (l.contains("jmx")) {
0408: Channel ch = stack.getChannel();
0409: if (ch != null) {
0410: Map m = ch.dumpStats();
0411: StringBuffer sb = new StringBuffer();
0412: sb.append("stats:\n");
0413: for (Iterator it = m.entrySet().iterator(); it
0414: .hasNext();) {
0415: sb.append(it.next()).append("\n");
0416: }
0417: info.append(sb);
0418: }
0419: }
0420: if (l.contains("props")) {
0421: String p = stack.printProtocolSpecAsXML();
0422: info.append("\nprops:\n").append(p);
0423: }
0424: }
0425:
0426: byte[] diag_rsp = info.toString().getBytes();
0427: if (log.isDebugEnabled())
0428: log.debug("sending diag response to " + sender);
0429: sendResponse(sock, sender, diag_rsp);
0430: } catch (Throwable t) {
0431: if (log.isErrorEnabled())
0432: log.error("failed sending diag rsp to " + sender, t);
0433: }
0434: }
0435:
0436: private static void sendResponse(DatagramSocket sock,
0437: SocketAddress sender, byte[] buf) throws IOException {
0438: DatagramPacket p = new DatagramPacket(buf, 0, buf.length,
0439: sender);
0440: sock.send(p);
0441: }
0442:
0443: /* ------------------------------------------------------------------------------- */
0444:
0445: /*------------------------------ Protocol interface ------------------------------ */
0446:
0447: public void init() throws Exception {
0448: super .init();
0449: if (bind_addr != null) {
0450: Map m = new HashMap(1);
0451: m.put("bind_addr", bind_addr);
0452: passUp(new Event(Event.CONFIG, m));
0453: }
0454: }
0455:
0456: /**
0457: * Creates the unicast and multicast sockets and starts the unicast and multicast receiver threads
0458: */
0459: public void start() throws Exception {
0460: timer = stack.timer;
0461: if (timer == null)
0462: throw new Exception("timer is null");
0463:
0464: if (enable_diagnostics) {
0465: diag_handler = new DiagnosticsHandler();
0466: diag_handler.start();
0467: }
0468:
0469: if (use_incoming_packet_handler) {
0470: incoming_packet_queue = new Queue();
0471: incoming_packet_handler = new IncomingPacketHandler();
0472: incoming_packet_handler.start();
0473: }
0474:
0475: if (loopback) {
0476: incoming_msg_queue = new Queue();
0477: incoming_msg_handler = new IncomingMessageHandler();
0478: incoming_msg_handler.start();
0479: }
0480:
0481: if (use_outgoing_packet_handler) {
0482: outgoing_queue = new BoundedLinkedQueue(
0483: outgoing_queue_max_size);
0484: outgoing_packet_handler = new OutgoingPacketHandler();
0485: outgoing_packet_handler.start();
0486: }
0487:
0488: if (enable_bundling) {
0489: bundler = new Bundler();
0490: }
0491:
0492: passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
0493: }
0494:
0495: public void stop() {
0496: if (diag_handler != null) {
0497: diag_handler.stop();
0498: diag_handler = null;
0499: }
0500:
0501: // 1. Stop the outgoing packet handler thread
0502: if (outgoing_packet_handler != null)
0503: outgoing_packet_handler.stop();
0504:
0505: // 2. Stop the incoming packet handler thread
0506: if (incoming_packet_handler != null)
0507: incoming_packet_handler.stop();
0508:
0509: // 3. Finally stop the incoming message handler
0510: if (incoming_msg_handler != null)
0511: incoming_msg_handler.stop();
0512: }
0513:
0514: /**
0515: * Setup the Protocol instance according to the configuration string
0516: * @return true if no other properties are left.
0517: * false if the properties still have data in them, ie ,
0518: * properties are left over and not handled by the protocol stack
0519: */
0520: public boolean setProperties(Properties props) {
0521: super .setProperties(props);
0522:
0523: boolean ignore_systemprops = Util
0524: .isBindAddressPropertyIgnored();
0525: String str = Util.getProperty(new String[] { Global.BIND_ADDR,
0526: Global.BIND_ADDR_OLD }, props, "bind_addr",
0527: ignore_systemprops, null);
0528:
0529: if (str != null) {
0530: try {
0531: bind_addr = InetAddress.getByName(str);
0532: } catch (UnknownHostException unknown) {
0533: if (log.isFatalEnabled())
0534: log
0535: .fatal("(bind_addr): host " + str
0536: + " not known");
0537: return false;
0538: }
0539: props.remove("bind_addr");
0540: }
0541:
0542: str = props.getProperty("use_local_host");
0543: if (str != null) {
0544: use_local_host = new Boolean(str).booleanValue();
0545: props.remove("use_local_host");
0546: }
0547:
0548: str = props.getProperty("bind_to_all_interfaces");
0549: if (str != null) {
0550: receive_on_all_interfaces = new Boolean(str).booleanValue();
0551: props.remove("bind_to_all_interfaces");
0552: log
0553: .warn("bind_to_all_interfaces has been deprecated; use receive_on_all_interfaces instead");
0554: }
0555:
0556: str = props.getProperty("receive_on_all_interfaces");
0557: if (str != null) {
0558: receive_on_all_interfaces = new Boolean(str).booleanValue();
0559: props.remove("receive_on_all_interfaces");
0560: }
0561:
0562: str = props.getProperty("receive_interfaces");
0563: if (str != null) {
0564: try {
0565: receive_interfaces = parseInterfaceList(str);
0566: props.remove("receive_interfaces");
0567: } catch (Exception e) {
0568: log.error("error determining interfaces (" + str + ")",
0569: e);
0570: return false;
0571: }
0572: }
0573:
0574: str = props.getProperty("send_on_all_interfaces");
0575: if (str != null) {
0576: send_on_all_interfaces = new Boolean(str).booleanValue();
0577: props.remove("send_on_all_interfaces");
0578: }
0579:
0580: str = props.getProperty("send_interfaces");
0581: if (str != null) {
0582: try {
0583: send_interfaces = parseInterfaceList(str);
0584: props.remove("send_interfaces");
0585: } catch (Exception e) {
0586: log.error("error determining interfaces (" + str + ")",
0587: e);
0588: return false;
0589: }
0590: }
0591:
0592: str = props.getProperty("bind_port");
0593: if (str != null) {
0594: bind_port = Integer.parseInt(str);
0595: props.remove("bind_port");
0596: }
0597:
0598: str = props.getProperty("port_range");
0599: if (str != null) {
0600: port_range = Integer.parseInt(str);
0601: props.remove("port_range");
0602: }
0603:
0604: str = props.getProperty("loopback");
0605: if (str != null) {
0606: loopback = Boolean.valueOf(str).booleanValue();
0607: props.remove("loopback");
0608: }
0609:
0610: str = props.getProperty("discard_incompatible_packets");
0611: if (str != null) {
0612: discard_incompatible_packets = Boolean.valueOf(str)
0613: .booleanValue();
0614: props.remove("discard_incompatible_packets");
0615: }
0616:
0617: // this is deprecated, just left for compatibility (use use_incoming_packet_handler)
0618: str = props.getProperty("use_packet_handler");
0619: if (str != null) {
0620: use_incoming_packet_handler = Boolean.valueOf(str)
0621: .booleanValue();
0622: props.remove("use_packet_handler");
0623: if (log.isWarnEnabled())
0624: log
0625: .warn("'use_packet_handler' is deprecated; use 'use_incoming_packet_handler' instead");
0626: }
0627:
0628: str = props.getProperty("use_incoming_packet_handler");
0629: if (str != null) {
0630: use_incoming_packet_handler = Boolean.valueOf(str)
0631: .booleanValue();
0632: props.remove("use_incoming_packet_handler");
0633: }
0634:
0635: str = props.getProperty("use_outgoing_packet_handler");
0636: if (str != null) {
0637: use_outgoing_packet_handler = Boolean.valueOf(str)
0638: .booleanValue();
0639: props.remove("use_outgoing_packet_handler");
0640: }
0641:
0642: str = props.getProperty("outgoing_queue_max_size");
0643: if (str != null) {
0644: outgoing_queue_max_size = Integer.parseInt(str);
0645: props.remove("outgoing_queue_max_size");
0646: if (outgoing_queue_max_size <= 0) {
0647: if (log.isWarnEnabled())
0648: log.warn("outgoing_queue_max_size of "
0649: + outgoing_queue_max_size
0650: + " is invalid, setting it to 1");
0651: outgoing_queue_max_size = 1;
0652: }
0653: }
0654:
0655: str = props.getProperty("max_bundle_size");
0656: if (str != null) {
0657: int bundle_size = Integer.parseInt(str);
0658: if (bundle_size > max_bundle_size) {
0659: if (log.isErrorEnabled())
0660: log
0661: .error("max_bundle_size ("
0662: + bundle_size
0663: + ") is greater than largest TP fragmentation size ("
0664: + max_bundle_size + ')');
0665: return false;
0666: }
0667: if (bundle_size <= 0) {
0668: if (log.isErrorEnabled())
0669: log.error("max_bundle_size (" + bundle_size
0670: + ") is <= 0");
0671: return false;
0672: }
0673: max_bundle_size = bundle_size;
0674: props.remove("max_bundle_size");
0675: }
0676:
0677: str = props.getProperty("max_bundle_timeout");
0678: if (str != null) {
0679: max_bundle_timeout = Long.parseLong(str);
0680: if (max_bundle_timeout <= 0) {
0681: if (log.isErrorEnabled())
0682: log.error("max_bundle_timeout of "
0683: + max_bundle_timeout + " is invalid");
0684: return false;
0685: }
0686: props.remove("max_bundle_timeout");
0687: }
0688:
0689: str = props.getProperty("enable_bundling");
0690: if (str != null) {
0691: enable_bundling = Boolean.valueOf(str).booleanValue();
0692: props.remove("enable_bundling");
0693: }
0694:
0695: str = props.getProperty("use_addr_translation");
0696: if (str != null) {
0697: use_addr_translation = Boolean.valueOf(str).booleanValue();
0698: props.remove("use_addr_translation");
0699: }
0700:
0701: str = props.getProperty("enable_diagnostics");
0702: if (str != null) {
0703: enable_diagnostics = Boolean.valueOf(str).booleanValue();
0704: props.remove("enable_diagnostics");
0705: }
0706:
0707: str = props.getProperty("diagnostics_addr");
0708: if (str != null) {
0709: diagnostics_addr = str;
0710: props.remove("diagnostics_addr");
0711: }
0712:
0713: str = props.getProperty("diagnostics_port");
0714: if (str != null) {
0715: diagnostics_port = Integer.parseInt(str);
0716: props.remove("diagnostics_port");
0717: }
0718:
0719: if (enable_bundling) {
0720: //if (use_outgoing_packet_handler == false)
0721: // if(log.isWarnEnabled()) log.warn("enable_bundling is true; setting use_outgoing_packet_handler=true");
0722: // use_outgoing_packet_handler=true;
0723: }
0724:
0725: return true;
0726: }
0727:
0728: /**
0729: * This prevents the up-handler thread to be created, which essentially is superfluous:
0730: * messages are received from the network rather than from a layer below.
0731: * DON'T REMOVE !
0732: */
0733: public void startUpHandler() {
0734: }
0735:
0736: /**
0737: * handle the UP event.
0738: * @param evt - the event being send from the stack
0739: */
0740: public void up(Event evt) {
0741: switch (evt.getType()) {
0742: case Event.CONFIG:
0743: passUp(evt);
0744: if (log.isDebugEnabled())
0745: log.debug("received CONFIG event: " + evt.getArg());
0746: handleConfigEvent((HashMap) evt.getArg());
0747: return;
0748: }
0749: passUp(evt);
0750: }
0751:
0752: /**
0753: * Caller by the layer above this layer. Usually we just put this Message
0754: * into the send queue and let one or more worker threads handle it. A worker thread
0755: * then removes the Message from the send queue, performs a conversion and adds the
0756: * modified Message to the send queue of the layer below it, by calling down()).
0757: */
0758: public void down(Event evt) {
0759: if (evt.getType() != Event.MSG) { // unless it is a message handle it and respond
0760: handleDownEvent(evt);
0761: return;
0762: }
0763:
0764: Message msg = (Message) evt.getArg();
0765: if (header != null) {
0766: // added patch by Roland Kurmann (March 20 2003)
0767: // msg.putHeader(name, new TpHeader(channel_name));
0768: msg.putHeader(name, header);
0769: }
0770:
0771: // Because we don't call Protocol.passDown(), we notify the observer directly (e.g. PerfObserver).
0772: // This way, we still have performance numbers for TP
0773: if (observer != null)
0774: observer.passDown(evt);
0775:
0776: setSourceAddress(msg); // very important !! listToBuffer() will fail with a null src address !!
0777: if (log.isTraceEnabled()) {
0778: StringBuffer sb = new StringBuffer("sending msg to ")
0779: .append(msg.getDest()).append(" (src=").append(
0780: msg.getSrc()).append("), headers are ")
0781: .append(msg.getHeaders());
0782: log.trace(sb.toString());
0783: }
0784:
0785: // Don't send if destination is local address. Instead, switch dst and src and put in up_queue.
0786: // If multicast message, loopback a copy directly to us (but still multicast). Once we receive this,
0787: // we will discard our own multicast message
0788: Address dest = msg.getDest();
0789: boolean multicast = dest == null || dest.isMulticastAddress();
0790: if (loopback && (multicast || dest.equals(local_addr))) {
0791: Message copy = msg.copy();
0792:
0793: // copy.removeHeader(name); // we don't remove the header
0794: copy.setSrc(local_addr);
0795: // copy.setDest(dest);
0796:
0797: if (log.isTraceEnabled())
0798: log.trace(new StringBuffer("looping back message ")
0799: .append(copy));
0800: try {
0801: incoming_msg_queue.add(copy);
0802: } catch (QueueClosedException e) {
0803: // log.error("failed adding looped back message to incoming_msg_queue", e);
0804: }
0805:
0806: if (!multicast)
0807: return;
0808: }
0809:
0810: try {
0811: if (use_outgoing_packet_handler)
0812: outgoing_queue.put(msg);
0813: else
0814: send(msg, dest, multicast);
0815: } catch (QueueClosedException closed_ex) {
0816: } catch (InterruptedException interruptedEx) {
0817: } catch (Throwable e) {
0818: if (log.isErrorEnabled()) {
0819: String dst = msg.getDest() == null ? "null" : msg
0820: .getDest().toString();
0821: log.error("failed sending message to " + dst + " ("
0822: + msg.getLength() + " bytes)", e.getCause());
0823: }
0824: }
0825: }
0826:
0827: /*--------------------------- End of Protocol interface -------------------------- */
0828:
0829: /* ------------------------------ Private Methods -------------------------------- */
0830:
0831: /**
0832: * If the sender is null, set our own address. We cannot just go ahead and set the address
0833: * anyway, as we might be sending a message on behalf of someone else ! E.gin case of
0834: * retransmission, when the original sender has crashed, or in a FLUSH protocol when we
0835: * have to return all unstable messages with the FLUSH_OK response.
0836: */
0837: private void setSourceAddress(Message msg) {
0838: if (msg.getSrc() == null)
0839: msg.setSrc(local_addr);
0840: }
0841:
0842: /**
0843: * Subclasses must call this method when a unicast or multicast message has been received.
0844: * Declared final so subclasses cannot override this method.
0845: *
0846: * @param dest
0847: * @param sender
0848: * @param data
0849: * @param offset
0850: * @param length
0851: */
0852: protected final void receive(Address dest, Address sender,
0853: byte[] data, int offset, int length) {
0854: if (data == null)
0855: return;
0856:
0857: // if(length == 4) { // received a diagnostics probe
0858: // if(data[offset] == 'd' && data[offset+1] == 'i' && data[offset+2] == 'a' && data[offset+3] == 'g') {
0859: // handleDiagnosticProbe(sender);
0860: // return;
0861: // }
0862: // }
0863:
0864: boolean mcast = dest == null || dest.isMulticastAddress();
0865: if (log.isTraceEnabled()) {
0866: StringBuffer sb = new StringBuffer("received (");
0867: sb.append(mcast ? "mcast) " : "ucast) ").append(length)
0868: .append(" bytes from ").append(sender);
0869: log.trace(sb.toString());
0870: }
0871:
0872: try {
0873: if (use_incoming_packet_handler) {
0874: byte[] tmp = new byte[length];
0875: System.arraycopy(data, offset, tmp, 0, length);
0876: incoming_packet_queue.add(new IncomingQueueEntry(dest,
0877: sender, tmp, 0, length));
0878: } else
0879: handleIncomingPacket(dest, sender, data, offset, length);
0880: } catch (Throwable t) {
0881: if (log.isErrorEnabled())
0882: log.error(
0883: new StringBuffer("failed handling data from ")
0884: .append(sender), t);
0885: }
0886: }
0887:
0888: /**
0889: * Processes a packet read from either the multicast or unicast socket. Needs to be synchronized because
0890: * mcast or unicast socket reads can be concurrent.
0891: * Correction (bela April 19 2005): we access no instance variables, all vars are allocated on the stack, so
0892: * this method should be reentrant: removed 'synchronized' keyword
0893: */
0894: private void handleIncomingPacket(Address dest, Address sender,
0895: byte[] data, int offset, int length) {
0896: Message msg = null;
0897: List l = null; // used if bundling is enabled
0898: short version;
0899: boolean is_message_list, multicast;
0900: byte flags;
0901:
0902: try {
0903: synchronized (in_stream) {
0904: in_stream.setData(data, offset, length);
0905: buf_in_stream.reset(length);
0906: version = dis.readShort();
0907: if (Version.isBinaryCompatible(version) == false) {
0908: if (log.isWarnEnabled()) {
0909: StringBuffer sb = new StringBuffer();
0910: sb.append("packet from ").append(sender)
0911: .append(" has different version (")
0912: .append(version);
0913: sb.append(") from ours (").append(
0914: Version.printVersion()).append("). ");
0915: if (discard_incompatible_packets)
0916: sb.append("Packet is discarded");
0917: else
0918: sb.append("This may cause problems");
0919: log.warn(sb);
0920: }
0921: if (discard_incompatible_packets)
0922: return;
0923: }
0924:
0925: flags = dis.readByte();
0926: is_message_list = (flags & LIST) == LIST;
0927: multicast = (flags & MULTICAST) == MULTICAST;
0928:
0929: if (is_message_list)
0930: l = bufferToList(dis, dest, multicast);
0931: else
0932: msg = bufferToMessage(dis, dest, sender, multicast);
0933: }
0934:
0935: LinkedList msgs = new LinkedList();
0936: if (is_message_list) {
0937: for (Enumeration en = l.elements(); en
0938: .hasMoreElements();)
0939: msgs.add(en.nextElement());
0940: } else
0941: msgs.add(msg);
0942:
0943: Address src;
0944: for (Iterator it = msgs.iterator(); it.hasNext();) {
0945: msg = (Message) it.next();
0946: src = msg.getSrc();
0947: if (loopback) {
0948: if (multicast && src != null
0949: && local_addr.equals(src)) { // discard own loopback multicast packets
0950: it.remove();
0951: }
0952: } else
0953: handleIncomingMessage(msg);
0954: }
0955: if (incoming_msg_queue != null && msgs.size() > 0)
0956: incoming_msg_queue.addAll(msgs);
0957: } catch (QueueClosedException closed_ex) {
0958: ; // swallow exception
0959: } catch (Throwable t) {
0960: if (log.isErrorEnabled())
0961: log.error("failed unmarshalling message", t);
0962: }
0963: }
0964:
0965: private void handleIncomingMessage(Message msg) {
0966: Event evt;
0967: TpHeader hdr;
0968:
0969: if (stats) {
0970: num_msgs_received++;
0971: num_bytes_received += msg.getLength();
0972: }
0973:
0974: evt = new Event(Event.MSG, msg);
0975: if (log.isTraceEnabled()) {
0976: StringBuffer sb = new StringBuffer("message is ").append(
0977: msg).append(", headers are ").append(
0978: msg.getHeaders());
0979: log.trace(sb);
0980: }
0981:
0982: /* Because Protocol.up() is never called by this bottommost layer, we call up() directly in the observer.
0983: * This allows e.g. PerfObserver to get the time of reception of a message */
0984: if (observer != null)
0985: observer.up(evt, up_queue.size());
0986:
0987: hdr = (TpHeader) msg.getHeader(name); // replaced removeHeader() with getHeader()
0988: if (hdr != null) {
0989:
0990: /* Discard all messages destined for a channel with a different name */
0991: String ch_name = hdr.channel_name;
0992:
0993: // Discard if message's group name is not the same as our group name unless the
0994: // message is a diagnosis message (special group name DIAG_GROUP)
0995: if (ch_name != null && channel_name != null
0996: && !channel_name.equals(ch_name)
0997: && !ch_name.equals(Util.DIAG_GROUP)) {
0998: if (log.isWarnEnabled())
0999: log
1000: .warn(new StringBuffer(
1001: "discarded message from different group \"")
1002: .append(ch_name).append(
1003: "\" (our group is \"")
1004: .append(channel_name).append(
1005: "\"). Sender was ").append(
1006: msg.getSrc()));
1007: return;
1008: }
1009: } else {
1010: if (log.isTraceEnabled())
1011: log
1012: .trace(new StringBuffer(
1013: "message does not have a transport header, msg is ")
1014: .append(msg).append(", headers are ")
1015: .append(msg.getHeaders()).append(
1016: ", will be discarded"));
1017: return;
1018: }
1019: passUp(evt);
1020: }
1021:
1022: /** Internal method to serialize and send a message. This method is not reentrant */
1023: private void send(Message msg, Address dest, boolean multicast)
1024: throws Exception {
1025: if (enable_bundling) {
1026: bundler.send(msg, dest);
1027: return;
1028: }
1029:
1030: // Needs to be synchronized because we can have possible concurrent access, e.g.
1031: // Discovery uses a separate thread to send out discovery messages
1032: // We would *not* need to sync between send(), OutgoingPacketHandler and BundlingOutgoingPacketHandler,
1033: // because only *one* of them is enabled
1034: Buffer buf;
1035: synchronized (out_stream) {
1036: buf = messageToBuffer(msg, multicast);
1037: doSend(buf, dest, multicast);
1038: }
1039: }
1040:
1041: private void doSend(Buffer buf, Address dest, boolean multicast)
1042: throws Exception {
1043: if (stats) {
1044: num_msgs_sent++;
1045: num_bytes_sent += buf.getLength();
1046: }
1047: if (multicast) {
1048: sendToAllMembers(buf.getBuf(), buf.getOffset(), buf
1049: .getLength());
1050: } else {
1051: sendToSingleMember(dest, buf.getBuf(), buf.getOffset(), buf
1052: .getLength());
1053: }
1054: }
1055:
1056: /**
1057: * This method needs to be synchronized on out_stream when it is called
1058: * @param msg
1059: * @return
1060: * @throws java.io.IOException
1061: */
1062: private Buffer messageToBuffer(Message msg, boolean multicast)
1063: throws Exception {
1064: Buffer retval;
1065: byte flags = 0;
1066:
1067: out_stream.reset();
1068: buf_out_stream.reset(out_stream.getCapacity());
1069: dos.reset();
1070: dos.writeShort(Version.version); // write the version
1071: if (multicast)
1072: flags += MULTICAST;
1073: dos.writeByte(flags);
1074: // preMarshalling(msg, dest, src); // allows for optimization by subclass
1075: msg.writeTo(dos);
1076: // postMarshalling(msg, dest, src); // allows for optimization by subclass
1077: dos.flush();
1078: retval = new Buffer(out_stream.getRawBuffer(), 0, out_stream
1079: .size());
1080: return retval;
1081: }
1082:
1083: private Message bufferToMessage(DataInputStream instream,
1084: Address dest, Address sender, boolean multicast)
1085: throws Exception {
1086: Message msg = new Message(false); // don't create headers, readFrom() will do this
1087: msg.readFrom(instream);
1088: postUnmarshalling(msg, dest, sender, multicast); // allows for optimization by subclass
1089: return msg;
1090: }
1091:
1092: private Buffer listToBuffer(List l, boolean multicast)
1093: throws Exception {
1094: Buffer retval;
1095: Address src;
1096: Message msg;
1097: byte flags = 0;
1098: int len = l != null ? l.size() : 0;
1099: boolean src_written = false;
1100: out_stream.reset();
1101: buf_out_stream.reset(out_stream.getCapacity());
1102: dos.reset();
1103: dos.writeShort(Version.version);
1104: flags += LIST;
1105: if (multicast)
1106: flags += MULTICAST;
1107: dos.writeByte(flags);
1108: dos.writeInt(len);
1109: for (Enumeration en = l.elements(); en.hasMoreElements();) {
1110: msg = (Message) en.nextElement();
1111: src = msg.getSrc();
1112: if (!src_written) {
1113: Util.writeAddress(src, dos);
1114: src_written = true;
1115: }
1116: // msg.setSrc(null);
1117: msg.writeTo(dos);
1118: // msg.setSrc(src);
1119: }
1120: dos.flush();
1121: retval = new Buffer(out_stream.getRawBuffer(), 0, out_stream
1122: .size());
1123: return retval;
1124: }
1125:
1126: private List bufferToList(DataInputStream instream, Address dest,
1127: boolean multicast) throws Exception {
1128: List l = new List();
1129: DataInputStream in = null;
1130: int len;
1131: Message msg;
1132: Address src;
1133:
1134: try {
1135: len = instream.readInt();
1136: src = Util.readAddress(instream);
1137: for (int i = 0; i < len; i++) {
1138: msg = new Message(false); // don't create headers, readFrom() will do this
1139: msg.readFrom(instream);
1140: postUnmarshallingList(msg, dest, multicast);
1141: msg.setSrc(src);
1142: l.add(msg);
1143: }
1144: return l;
1145: } finally {
1146: Util.close(in);
1147: }
1148: }
1149:
1150: /**
1151: *
1152: * @param s
1153: * @return List<NetworkInterface>
1154: */
1155: private java.util.List parseInterfaceList(String s)
1156: throws Exception {
1157: java.util.List interfaces = new ArrayList(10);
1158: if (s == null)
1159: return null;
1160:
1161: StringTokenizer tok = new StringTokenizer(s, ",");
1162: String interface_name;
1163: NetworkInterface intf;
1164:
1165: while (tok.hasMoreTokens()) {
1166: interface_name = tok.nextToken();
1167:
1168: // try by name first (e.g. (eth0")
1169: intf = NetworkInterface.getByName(interface_name);
1170:
1171: // next try by IP address or symbolic name
1172: if (intf == null)
1173: intf = NetworkInterface.getByInetAddress(InetAddress
1174: .getByName(interface_name));
1175:
1176: if (intf == null)
1177: throw new Exception("interface " + interface_name
1178: + " not found");
1179: if (interfaces.contains(intf)) {
1180: log.warn("did not add interface " + interface_name
1181: + " (already present in " + print(interfaces)
1182: + ")");
1183: } else {
1184: interfaces.add(intf);
1185: }
1186: }
1187: return interfaces;
1188: }
1189:
1190: private static String print(java.util.List interfaces) {
1191: StringBuffer sb = new StringBuffer();
1192: boolean first = true;
1193: NetworkInterface intf;
1194: for (Iterator it = interfaces.iterator(); it.hasNext();) {
1195: intf = (NetworkInterface) it.next();
1196: if (first) {
1197: first = false;
1198: } else {
1199: sb.append(", ");
1200: }
1201: sb.append(intf.getName());
1202: }
1203: return sb.toString();
1204: }
1205:
1206: protected void handleDownEvent(Event evt) {
1207: switch (evt.getType()) {
1208:
1209: case Event.TMP_VIEW:
1210: case Event.VIEW_CHANGE:
1211: synchronized (members) {
1212: view = (View) evt.getArg();
1213: members.clear();
1214: Vector tmpvec = view.getMembers();
1215: members.addAll(tmpvec);
1216: }
1217: break;
1218:
1219: case Event.GET_LOCAL_ADDRESS: // return local address -> Event(SET_LOCAL_ADDRESS, local)
1220: passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
1221: break;
1222:
1223: case Event.CONNECT:
1224: channel_name = (String) evt.getArg();
1225: header = new TpHeader(channel_name);
1226: setThreadNames();
1227:
1228: // removed March 18 2003 (bela), not needed (handled by GMS)
1229: // changed July 2 2003 (bela): we discard CONNECT_OK at the GMS level anyway, this might
1230: // be needed if we run without GMS though
1231: passUp(new Event(Event.CONNECT_OK));
1232: break;
1233:
1234: case Event.DISCONNECT:
1235: unsetThreadNames();
1236: passUp(new Event(Event.DISCONNECT_OK));
1237: break;
1238:
1239: case Event.CONFIG:
1240: if (log.isDebugEnabled())
1241: log.debug("received CONFIG event: " + evt.getArg());
1242: handleConfigEvent((HashMap) evt.getArg());
1243: break;
1244: }
1245: }
1246:
1247: protected void setThreadNames() {
1248: if (channel_name != null) {
1249: String tmp, prefix = Global.THREAD_PREFIX;
1250: if (incoming_packet_handler != null) {
1251: tmp = incoming_packet_handler.getName();
1252: if (tmp != null && tmp.indexOf(prefix) == -1) {
1253: tmp += prefix + channel_name + ")";
1254: incoming_packet_handler.setName(tmp);
1255: }
1256: }
1257: if (incoming_msg_handler != null) {
1258: tmp = incoming_msg_handler.getName();
1259: if (tmp != null && tmp.indexOf(prefix) == -1) {
1260: tmp += prefix + channel_name + ")";
1261: incoming_msg_handler.setName(tmp);
1262: }
1263: }
1264: if (outgoing_packet_handler != null) {
1265: tmp = outgoing_packet_handler.getName();
1266: if (tmp != null && tmp.indexOf(prefix) == -1) {
1267: tmp += prefix + channel_name + ")";
1268: outgoing_packet_handler.setName(tmp);
1269: }
1270: }
1271: if (diag_handler != null) {
1272: tmp = diag_handler.getName();
1273: if (tmp != null && tmp.indexOf(prefix) == -1) {
1274: tmp += prefix + channel_name + ")";
1275: diag_handler.setName(tmp);
1276: }
1277: }
1278: }
1279: }
1280:
1281: protected void unsetThreadNames() {
1282: if (channel_name != null) {
1283: String tmp, prefix = Global.THREAD_PREFIX;
1284: int index;
1285:
1286: tmp = incoming_packet_handler != null ? incoming_packet_handler
1287: .getName()
1288: : null;
1289: if (tmp != null) {
1290: index = tmp.indexOf(prefix);
1291: if (index > -1) {
1292: tmp = tmp.substring(0, index);
1293: incoming_packet_handler.setName(tmp);
1294: }
1295: }
1296:
1297: tmp = incoming_msg_handler != null ? incoming_msg_handler
1298: .getName() : null;
1299: if (tmp != null) {
1300: index = tmp.indexOf(prefix);
1301: if (index > -1) {
1302: tmp = tmp.substring(0, index);
1303: incoming_msg_handler.setName(tmp);
1304: }
1305: }
1306:
1307: tmp = outgoing_packet_handler != null ? outgoing_packet_handler
1308: .getName()
1309: : null;
1310: if (tmp != null) {
1311: index = tmp.indexOf(prefix);
1312: if (index > -1) {
1313: tmp = tmp.substring(0, index);
1314: outgoing_packet_handler.setName(tmp);
1315: }
1316: }
1317: tmp = diag_handler != null ? diag_handler.getName() : null;
1318: if (tmp != null) {
1319: index = tmp.indexOf(prefix);
1320: if (index > -1) {
1321: tmp = tmp.substring(0, index);
1322: diag_handler.setName(tmp);
1323: }
1324: }
1325: }
1326: }
1327:
1328: protected void handleConfigEvent(HashMap map) {
1329: if (map == null)
1330: return;
1331: if (map.containsKey("additional_data")) {
1332: additional_data = (byte[]) map.get("additional_data");
1333: if (local_addr instanceof IpAddress)
1334: ((IpAddress) local_addr)
1335: .setAdditionalData(additional_data);
1336: }
1337: }
1338:
1339: /* ----------------------------- End of Private Methods ---------------------------------------- */
1340:
1341: /* ----------------------------- Inner Classes ---------------------------------------- */
1342:
1343: static class IncomingQueueEntry {
1344: Address dest = null;
1345: Address sender = null;
1346: byte[] buf;
1347: int offset, length;
1348:
1349: IncomingQueueEntry(Address dest, Address sender, byte[] buf,
1350: int offset, int length) {
1351: this .dest = dest;
1352: this .sender = sender;
1353: this .buf = buf;
1354: this .offset = offset;
1355: this .length = length;
1356: }
1357: }
1358:
1359: /**
1360: * This thread fetches byte buffers from the packet_queue, converts them into messages and passes them up
1361: * to the higher layer (done in handleIncomingUdpPacket()).
1362: */
1363: class IncomingPacketHandler implements Runnable {
1364: Thread t = null;
1365:
1366: String getName() {
1367: return t != null ? t.getName() : null;
1368: }
1369:
1370: void setName(String thread_name) {
1371: if (t != null)
1372: t.setName(thread_name);
1373: }
1374:
1375: void start() {
1376: if (t == null || !t.isAlive()) {
1377: t = new Thread(Util.getGlobalThreadGroup(), this ,
1378: "IncomingPacketHandler");
1379: t.setDaemon(true);
1380: t.start();
1381: }
1382: }
1383:
1384: void stop() {
1385: Thread tmp = t;
1386: t = null;
1387: incoming_packet_queue.close(true); // should terminate the packet_handler thread too
1388: if (tmp != null) {
1389: try {
1390: tmp.join(10000);
1391: } catch (InterruptedException e) {
1392: }
1393: if (tmp.isAlive()) {
1394: if (log.isWarnEnabled())
1395: log
1396: .warn("IncomingPacketHandler thread was interrupted, but is still alive");
1397: }
1398: }
1399: }
1400:
1401: public void run() {
1402: IncomingQueueEntry entry;
1403: while (!incoming_packet_queue.closed()
1404: && Thread.currentThread().equals(t)) {
1405: try {
1406: entry = (IncomingQueueEntry) incoming_packet_queue
1407: .remove();
1408: handleIncomingPacket(entry.dest, entry.sender,
1409: entry.buf, entry.offset, entry.length);
1410: } catch (QueueClosedException closed_ex) {
1411: break;
1412: } catch (Throwable ex) {
1413: if (log.isErrorEnabled())
1414: log.error("error processing incoming packet",
1415: ex);
1416: }
1417: }
1418: if (log.isTraceEnabled())
1419: log.trace("incoming packet handler terminating");
1420: }
1421: }
1422:
1423: class IncomingMessageHandler implements Runnable {
1424: Thread t;
1425: int i = 0;
1426:
1427: String getName() {
1428: return t != null ? t.getName() : null;
1429: }
1430:
1431: void setName(String thread_name) {
1432: if (t != null)
1433: t.setName(thread_name);
1434: }
1435:
1436: public void start() {
1437: if (t == null || !t.isAlive()) {
1438: t = new Thread(Util.getGlobalThreadGroup(), this ,
1439: "IncomingMessageHandler");
1440: t.setDaemon(true);
1441: t.start();
1442: }
1443: }
1444:
1445: public void stop() {
1446: incoming_msg_queue.close(true);
1447: t = null;
1448: }
1449:
1450: public void run() {
1451: Message msg;
1452: while (!incoming_msg_queue.closed()
1453: && Thread.currentThread().equals(t)) {
1454: try {
1455: msg = (Message) incoming_msg_queue.remove();
1456: handleIncomingMessage(msg);
1457: } catch (QueueClosedException closed_ex) {
1458: break;
1459: } catch (Throwable ex) {
1460: if (log.isErrorEnabled())
1461: log.error("error processing incoming message",
1462: ex);
1463: }
1464: }
1465: if (log.isTraceEnabled())
1466: log.trace("incoming message handler terminating");
1467: }
1468: }
1469:
1470: /**
1471: * This thread fetches byte buffers from the outgoing_packet_queue, converts them into messages and sends them
1472: * using the unicast or multicast socket
1473: */
1474: class OutgoingPacketHandler implements Runnable {
1475: Thread t = null;
1476: byte[] buf;
1477: DatagramPacket packet;
1478:
1479: String getName() {
1480: return t != null ? t.getName() : null;
1481: }
1482:
1483: void setName(String thread_name) {
1484: if (t != null)
1485: t.setName(thread_name);
1486: }
1487:
1488: void start() {
1489: if (t == null || !t.isAlive()) {
1490: t = new Thread(Util.getGlobalThreadGroup(), this ,
1491: "OutgoingPacketHandler");
1492: t.setDaemon(true);
1493: t.start();
1494: }
1495: }
1496:
1497: void stop() {
1498: Thread tmp = t;
1499: t = null;
1500: if (tmp != null) {
1501: tmp.interrupt();
1502: }
1503: }
1504:
1505: public void run() {
1506: Message msg;
1507:
1508: while (t != null && Thread.currentThread().equals(t)) {
1509: try {
1510: msg = (Message) outgoing_queue.take();
1511: handleMessage(msg);
1512: } catch (QueueClosedException closed_ex) {
1513: break;
1514: } catch (InterruptedException interruptedEx) {
1515: } catch (Throwable th) {
1516: if (log.isErrorEnabled())
1517: log.error("exception sending packet", th);
1518: }
1519: msg = null; // let's give the garbage collector a hand... this is probably useless though
1520: }
1521: if (log.isTraceEnabled())
1522: log.trace("outgoing message handler terminating");
1523: }
1524:
1525: protected void handleMessage(Message msg) throws Throwable {
1526: Address dest = msg.getDest();
1527: send(msg, dest, dest == null || dest.isMulticastAddress());
1528: }
1529:
1530: }
1531:
1532: /**
1533: * Bundles smaller messages into bigger ones. Collects messages in a list until
1534: * messages of a total of <tt>max_bundle_size bytes</tt> have accumulated, or until
1535: * <tt>max_bundle_timeout</tt> milliseconds have elapsed, whichever is first. Messages
1536: * are unbundled at the receiver.
1537: */
1538: // private class BundlingOutgoingPacketHandler extends OutgoingPacketHandler {
1539: // /** HashMap<Address, List<Message>>. Keys are destinations, values are lists of Messages */
1540: // final HashMap msgs=new HashMap(11);
1541: // long count=0; // current number of bytes accumulated
1542: // int num_msgs=0;
1543: // long start=0;
1544: // long wait_time=0; // wait for removing messages from the queue
1545: //
1546: //
1547: //
1548: // private void init() {
1549: // wait_time=start=count=0;
1550: // }
1551: //
1552: // void start() {
1553: // init();
1554: // super.start();
1555: // t.setName("BundlingOutgoingPacketHandler");
1556: // }
1557: //
1558: // void stop() {
1559: // // bundleAndSend();
1560: // super.stop();
1561: // }
1562: //
1563: // public void run() {
1564: // Message msg;
1565: // long length;
1566: // while(t != null && Thread.currentThread().equals(t)) {
1567: // try {
1568: // msg=(Message)outgoing_queue.poll(wait_time);
1569: // if(msg == null)
1570: // throw new TimeoutException();
1571: // length=msg.size();
1572: // checkLength(length);
1573: // if(start == 0)
1574: // start=System.currentTimeMillis();
1575: //
1576: // if(count + length >= max_bundle_size) {
1577: // bundleAndSend();
1578: // count=0;
1579: // start=System.currentTimeMillis();
1580: // }
1581: //
1582: // addMessage(msg);
1583: // count+=length;
1584: //
1585: // wait_time=max_bundle_timeout - (System.currentTimeMillis() - start);
1586: // if(wait_time <= 0) {
1587: // bundleAndSend();
1588: // init();
1589: // }
1590: // }
1591: // catch(QueueClosedException queue_closed_ex) {
1592: // bundleAndSend();
1593: // break;
1594: // }
1595: // catch(TimeoutException timeout_ex) {
1596: // bundleAndSend();
1597: // init();
1598: // }
1599: // catch(Throwable ex) {
1600: // log.error("failure in bundling", ex);
1601: // }
1602: // }
1603: // if(log.isTraceEnabled()) log.trace("BundlingOutgoingPacketHandler thread terminated");
1604: // }
1605: //
1606: //
1607: //
1608: //
1609: // private void checkLength(long len) throws Exception {
1610: // if(len > max_bundle_size)
1611: // throw new Exception("message size (" + len + ") is greater than max bundling size (" + max_bundle_size +
1612: // "). Set the fragmentation/bundle size in FRAG and TP correctly");
1613: // }
1614: //
1615: //
1616: // private void addMessage(Message msg) { // no sync needed, never called by multiple threads concurrently
1617: // List tmp;
1618: // Address dst=msg.getDest();
1619: // tmp=(List)msgs.get(dst);
1620: // if(tmp == null) {
1621: // tmp=new List();
1622: // msgs.put(dst, tmp);
1623: // }
1624: // tmp.add(msg);
1625: // num_msgs++;
1626: // }
1627: //
1628: //
1629: //
1630: // private void bundleAndSend() {
1631: // Map.Entry entry;
1632: // Address dst;
1633: // Buffer buffer;
1634: // List l;
1635: // long stop_time=System.currentTimeMillis();
1636: //
1637: // if(msgs.size() == 0)
1638: // return;
1639: //
1640: // try {
1641: // if(log.isTraceEnabled()) {
1642: // StringBuffer sb=new StringBuffer("sending ").append(num_msgs).append(" msgs (");
1643: // sb.append(count).append(" bytes, ").append(stop_time-start).append("ms)");
1644: // sb.append(" to ").append(msgs.size()).append(" destination(s)");
1645: // if(msgs.size() > 1) sb.append(" (dests=").append(msgs.keySet()).append(")");
1646: // log.trace(sb.toString());
1647: // }
1648: // boolean multicast;
1649: // for(Iterator it=msgs.entrySet().iterator(); it.hasNext();) {
1650: // entry=(Map.Entry)it.next();
1651: // l=(List)entry.getValue();
1652: // if(l.size() == 0)
1653: // continue;
1654: // dst=(Address)entry.getKey();
1655: // multicast=dst == null || dst.isMulticastAddress();
1656: // synchronized(out_stream) {
1657: // try {
1658: // buffer=listToBuffer(l, multicast);
1659: // doSend(buffer, dst, multicast);
1660: // }
1661: // catch(Throwable e) {
1662: // if(log.isErrorEnabled()) log.error("exception sending msg", e);
1663: // }
1664: // }
1665: // }
1666: // }
1667: // finally {
1668: // msgs.clear();
1669: // num_msgs=0;
1670: // }
1671: // }
1672: // }
1673:
1674: private class Bundler {
1675: /** HashMap<Address, List<Message>>. Keys are destinations, values are lists of Messages */
1676: final HashMap msgs = new HashMap(36);
1677: long count = 0; // current number of bytes accumulated
1678: int num_msgs = 0;
1679: long start = 0;
1680: BundlingTimer bundling_timer = null;
1681:
1682: private synchronized void send(Message msg, Address dest)
1683: throws Exception {
1684: long length = msg.size();
1685: checkLength(length);
1686:
1687: if (start == 0)
1688: start = System.currentTimeMillis();
1689: if (count + length >= max_bundle_size) {
1690: cancelTimer();
1691: bundleAndSend(); // clears msgs and resets num_msgs
1692: }
1693:
1694: addMessage(msg, dest);
1695: count += length;
1696: startTimer(); // start timer if not running
1697: }
1698:
1699: /** Never called concurrently with cancelTimer - no need for synchronization */
1700: private void startTimer() {
1701: if (bundling_timer == null || bundling_timer.cancelled()) {
1702: bundling_timer = new BundlingTimer();
1703: timer.add(bundling_timer);
1704: }
1705: }
1706:
1707: /** Never called concurrently with startTimer() - no need for synchronization */
1708: private void cancelTimer() {
1709: if (bundling_timer != null) {
1710: bundling_timer.cancel();
1711: bundling_timer = null;
1712: }
1713: }
1714:
1715: private void addMessage(Message msg, Address dest) { // no sync needed, never called by multiple threads concurrently
1716: List tmp;
1717: synchronized (msgs) {
1718: tmp = (List) msgs.get(dest);
1719: if (tmp == null) {
1720: tmp = new List();
1721: msgs.put(dest, tmp);
1722: }
1723: tmp.add(msg);
1724: num_msgs++;
1725: }
1726: }
1727:
1728: private void bundleAndSend() {
1729: Map.Entry entry;
1730: Address dst;
1731: Buffer buffer;
1732: List l;
1733: Map copy;
1734:
1735: synchronized (msgs) {
1736: if (msgs.size() == 0)
1737: return;
1738: copy = new HashMap(msgs);
1739: if (log.isTraceEnabled()) {
1740: long stop = System.currentTimeMillis();
1741: double percentage = 100.0 / max_bundle_size * count;
1742: StringBuffer sb = new StringBuffer("sending ")
1743: .append(num_msgs).append(" msgs (");
1744: num_msgs = 0;
1745: sb
1746: .append(count)
1747: .append(
1748: " bytes ("
1749: + f.format(percentage)
1750: + "% of max_bundle_size), collected in "
1751: + +(stop - start)
1752: + "ms) to ").append(
1753: copy.size()).append(
1754: " destination(s)");
1755: if (copy.size() > 1)
1756: sb.append(" (dests=").append(copy.keySet())
1757: .append(")");
1758: log.trace(sb.toString());
1759: }
1760: msgs.clear();
1761: count = 0;
1762: }
1763:
1764: try {
1765: boolean multicast;
1766: for (Iterator it = copy.entrySet().iterator(); it
1767: .hasNext();) {
1768: entry = (Map.Entry) it.next();
1769: l = (List) entry.getValue();
1770: if (l.size() == 0)
1771: continue;
1772: dst = (Address) entry.getKey();
1773: multicast = dst == null || dst.isMulticastAddress();
1774: synchronized (out_stream) {
1775: try {
1776: buffer = listToBuffer(l, multicast);
1777: doSend(buffer, dst, multicast);
1778: } catch (Throwable e) {
1779: if (log.isErrorEnabled())
1780: log.error("exception sending msg: "
1781: + e.toString(), e.getCause());
1782: }
1783: }
1784: }
1785: } finally {
1786: start = 0;
1787: }
1788: }
1789:
1790: private void checkLength(long len) throws Exception {
1791: if (len > max_bundle_size)
1792: throw new Exception(
1793: "message size ("
1794: + len
1795: + ") is greater than max bundling size ("
1796: + max_bundle_size
1797: + "). Set the fragmentation/bundle size in FRAG and TP correctly");
1798: }
1799:
1800: private class BundlingTimer implements TimeScheduler.Task {
1801: boolean cancelled = false;
1802:
1803: void cancel() {
1804: cancelled = true;
1805: }
1806:
1807: public boolean cancelled() {
1808: return cancelled;
1809: }
1810:
1811: public long nextInterval() {
1812: return max_bundle_timeout;
1813: }
1814:
1815: public void run() {
1816: bundleAndSend();
1817: cancelled = true;
1818: }
1819: }
1820: }
1821:
1822: private class DiagnosticsHandler implements Runnable {
1823: Thread t = null;
1824: MulticastSocket diag_sock = null;
1825:
1826: DiagnosticsHandler() {
1827: }
1828:
1829: String getName() {
1830: return t != null ? t.getName() : null;
1831: }
1832:
1833: void setName(String thread_name) {
1834: if (t != null)
1835: t.setName(thread_name);
1836: }
1837:
1838: void start() throws IOException {
1839: diag_sock = new MulticastSocket(diagnostics_port);
1840: java.util.List interfaces = Util
1841: .getAllAvailableInterfaces();
1842: bindToInterfaces(interfaces, diag_sock);
1843:
1844: if (t == null || !t.isAlive()) {
1845: t = new Thread(Util.getGlobalThreadGroup(), this ,
1846: "DiagnosticsHandler");
1847: t.setDaemon(true);
1848: t.start();
1849: }
1850: }
1851:
1852: void stop() {
1853: if (diag_sock != null)
1854: diag_sock.close();
1855: t = null;
1856: }
1857:
1858: public void run() {
1859: byte[] buf = new byte[1500]; // MTU on most LANs
1860: DatagramPacket packet;
1861: while (!diag_sock.isClosed()
1862: && Thread.currentThread().equals(t)) {
1863: packet = new DatagramPacket(buf, 0, buf.length);
1864: try {
1865: diag_sock.receive(packet);
1866: handleDiagnosticProbe(packet.getSocketAddress(),
1867: diag_sock, new String(packet.getData(),
1868: packet.getOffset(), packet
1869: .getLength()));
1870: } catch (IOException e) {
1871: }
1872: }
1873: }
1874:
1875: private void bindToInterfaces(java.util.List interfaces,
1876: MulticastSocket s) {
1877: SocketAddress group_addr = new InetSocketAddress(
1878: diagnostics_addr, diagnostics_port);
1879: for (Iterator it = interfaces.iterator(); it.hasNext();) {
1880: NetworkInterface i = (NetworkInterface) it.next();
1881: try {
1882: if (i.getInetAddresses().hasMoreElements()) { // fix for VM crash - suggested by JJalenak@netopia.com
1883: s.joinGroup(group_addr, i);
1884: if (log.isTraceEnabled())
1885: log.trace("joined " + group_addr + " on "
1886: + i.getName());
1887: }
1888: } catch (IOException e) {
1889: log.warn("failed to join " + group_addr + " on "
1890: + i.getName() + ": " + e);
1891: }
1892: }
1893: }
1894: }
1895:
1896: }
|