001: package org.lateralnz.messaging.util;
002:
003: import java.io.Serializable;
004: import java.net.DatagramPacket;
005: import java.net.InetAddress;
006: import java.util.HashMap;
007: import java.util.Map;
008: import java.util.Iterator;
009:
010: import org.apache.log4j.Logger;
011:
012: import org.lateralnz.common.wrapper.IntHolder;
013: import org.lateralnz.common.util.Constants;
014: import org.lateralnz.common.util.NumericUtils;
015: import org.lateralnz.common.util.ObjectUtils;
016:
017: public class PacketUtils implements Constants {
018: private static final Logger log = Logger
019: .getLogger(PacketUtils.class.getName());
020: private static final int THIRTY_MINUTES = 1800000;
021: private static final int TEN_MINUTES = 600000;
022: private static IntHolder MSG_ID = new IntHolder(0); // ids for sent packets
023: private static HashMap receivedPackets = new HashMap(); // a map of received packets
024:
025: private static long time = System.currentTimeMillis();
026:
027: private PacketUtils() {
028: }
029:
030: public static final void dumpOldData() {
031: long time2 = System.currentTimeMillis() - time;
032: // after 30 minutes, check the packets map, and remove old packets
033: if (time2 > THIRTY_MINUTES) {
034: if (log.isInfoEnabled()) {
035: log.info("packet dump processing");
036: }
037: // reset timer
038: time = System.currentTimeMillis();
039: Iterator iter = ((HashMap) receivedPackets.clone())
040: .keySet().iterator();
041: PacketDataWrapper dw;
042: while (iter.hasNext()) {
043: String tmpkey = (String) iter.next();
044: dw = (PacketDataWrapper) receivedPackets.get(tmpkey);
045: // remove packets older than 10 minutes
046: if ((System.currentTimeMillis() - dw.getTimeCreated()) > TEN_MINUTES) {
047: if (log.isInfoEnabled()) {
048: log.info("dumping packet " + tmpkey);
049: }
050: iter.remove();
051: }
052: }
053: }
054: }
055:
056: public static final byte[] getPacketID() {
057: int id;
058: synchronized (MSG_ID) {
059: if (MSG_ID.value >= Integer.MAX_VALUE) {
060: MSG_ID.value = 0;
061: }
062: MSG_ID.value++;
063: id = MSG_ID.value;
064: }
065: return NumericUtils.toByteArray(id, false);
066: }
067:
068: public static Object reconstitute(DatagramPacket packet,
069: int receiveBufSize) throws Exception {
070: String addr = packet.getAddress().getHostAddress();
071: int id = NumericUtils.toInt(packet.getData(), 0, 4);
072: int max = NumericUtils.toInt(packet.getData(), 4, 4);
073: int ord = NumericUtils.toInt(packet.getData(), 8, 4);
074:
075: String key = addr + UNDERSCORE + id;
076: // work out if we've already received other packets with the same id
077: // from that host
078: PacketDataWrapper dw;
079: if (receivedPackets.containsKey(key)) {
080: dw = (PacketDataWrapper) receivedPackets.get(key);
081: } else {
082: dw = new PacketDataWrapper(max);
083: receivedPackets.put(key, dw);
084: }
085: byte[] tmp = new byte[packet.getData().length - 12];
086: System.arraycopy(packet.getData(), 12, tmp, 0, tmp.length);
087: dw.set(ord, tmp);
088:
089: // if we've received all the packets for this event
090: // we'll rebuild the object
091: if (dw.count >= max) {
092: receivedPackets.remove(key);
093:
094: return ObjectUtils.deserialize(dw.get());
095: } else {
096: return null;
097: }
098: }
099:
100: public static final DatagramPacket[] split(Serializable obj,
101: InetAddress addr, int port, int maximumBufferSize)
102: throws Exception {
103: byte[] data = ObjectUtils.serialize(obj);
104: // create an ID for the packet
105: byte[] bid = getPacketID();
106:
107: // calculate the number of packets required to send this data
108: int packNum = (int) Math.ceil((float) data.length
109: / (float) maximumBufferSize);
110: byte[] pnum = NumericUtils.toByteArray(packNum, false);
111:
112: int size = (data.length / maximumBufferSize)
113: + (data.length % maximumBufferSize > 0 ? 1 : 0);
114: DatagramPacket[] packets = new DatagramPacket[size];
115: // loop through the data
116: for (int k = 0, j = 0; j < data.length; j += maximumBufferSize, k++) {
117: byte[] sendbuf = new byte[maximumBufferSize + 12];
118: System.arraycopy(bid, 0, sendbuf, 0, 4);
119: System.arraycopy(pnum, 0, sendbuf, 4, 4);
120:
121: int len = maximumBufferSize;
122: if (data.length - j < maximumBufferSize) {
123: len = data.length - j;
124: }
125: // copy in a chunk of the data
126: System.arraycopy(data, j, sendbuf, 12, len);
127:
128: // set the order (sequence) of this packet
129: System.arraycopy(NumericUtils.toByteArray(k, false), 0,
130: sendbuf, 8, 4);
131:
132: // create the datagram packet
133: if (addr == null) {
134: packets[k] = new DatagramPacket(sendbuf, len + 12);
135: } else {
136: packets[k] = new DatagramPacket(sendbuf, len + 12,
137: addr, port);
138: }
139: }
140:
141: return packets;
142: }
143:
144: }
|