001: package org.jgroups.protocols;
002:
003: import org.jgroups.Address;
004: import org.jgroups.Event;
005: import org.jgroups.Message;
006: import org.jgroups.View;
007: import org.jgroups.stack.Protocol;
008: import org.jgroups.util.Range;
009: import org.jgroups.util.Util;
010:
011: import java.util.*;
012:
013: /**
014: * Fragmentation layer. Fragments messages larger than frag_size into smaller packets.
015: * Reassembles fragmented packets into bigger ones. The fragmentation number is prepended
016: * to the messages as a header (and removed at the receiving side).<p>
017: * Each fragment is identified by (a) the sender (part of the message to which the header is appended),
018: * (b) the fragmentation ID (which is unique per FRAG2 layer (monotonically increasing) and (c) the
019: * fragement ID which ranges from 0 to number_of_fragments-1.<p>
020: * Requirement: lossless delivery (e.g. NAK, ACK). No requirement on ordering. Works for both unicast and
021: * multicast messages.<br/>
022: * Compared to FRAG, this protocol does <em>not</em> need to serialize the message in order to break it into
023: * smaller fragments: it looks only at the message's buffer, which is a byte[] array anyway. We assume that the
024: * size addition for headers and src and dest address is minimal when the transport finally has to serialize the
025: * message, so we add a constant (200 bytes).
026: * @author Bela Ban
027: * @version $Id: FRAG2.java,v 1.25.2.1 2007/04/27 08:03:51 belaban Exp $
028: */
029: public class FRAG2 extends Protocol {
030:
031: /** The max number of bytes in a message. If a message's buffer is bigger, it will be fragmented */
032: int frag_size = 1500;
033:
034: /** Number of bytes that we think the headers plus src and dest will take up when
035: message is serialized by transport. This will be subtracted from frag_size */
036: int overhead = 200;
037:
038: /*the fragmentation list contains a fragmentation table per sender
039: *this way it becomes easier to clean up if a sender (member) leaves or crashes
040: */
041: private final FragmentationList fragment_list = new FragmentationList();
042: private int curr_id = 1;
043: private final Vector members = new Vector(11);
044: private static final String name = "FRAG2";
045:
046: long num_sent_msgs = 0;
047: long num_sent_frags = 0;
048: long num_received_msgs = 0;
049: long num_received_frags = 0;
050:
051: public final String getName() {
052: return name;
053: }
054:
055: public int getFragSize() {
056: return frag_size;
057: }
058:
059: public void setFragSize(int s) {
060: frag_size = s;
061: }
062:
063: public int getOverhead() {
064: return overhead;
065: }
066:
067: public void setOverhead(int o) {
068: overhead = o;
069: }
070:
071: public long getNumberOfSentMessages() {
072: return num_sent_msgs;
073: }
074:
075: public long getNumberOfSentFragments() {
076: return num_sent_frags;
077: }
078:
079: public long getNumberOfReceivedMessages() {
080: return num_received_msgs;
081: }
082:
083: public long getNumberOfReceivedFragments() {
084: return num_received_frags;
085: }
086:
087: synchronized int getNextId() {
088: return curr_id++;
089: }
090:
091: /** Setup the Protocol instance acording to the configuration string */
092: public boolean setProperties(Properties props) {
093: String str;
094:
095: super .setProperties(props);
096: str = props.getProperty("frag_size");
097: if (str != null) {
098: frag_size = Integer.parseInt(str);
099: props.remove("frag_size");
100: }
101:
102: str = props.getProperty("overhead");
103: if (str != null) {
104: overhead = Integer.parseInt(str);
105: props.remove("overhead");
106: }
107:
108: int old_frag_size = frag_size;
109: frag_size -= overhead;
110: if (frag_size <= 0) {
111: log.error("frag_size=" + old_frag_size + ", overhead="
112: + overhead + ", new frag_size=" + frag_size
113: + ": new frag_size is invalid");
114: return false;
115: }
116:
117: if (log.isInfoEnabled())
118: log.info("frag_size=" + old_frag_size + ", overhead="
119: + overhead + ", new frag_size=" + frag_size);
120:
121: if (props.size() > 0) {
122: log
123: .error("FRAG2.setProperties(): the following properties are not recognized: "
124: + props);
125: return false;
126: }
127: return true;
128: }
129:
130: public void resetStats() {
131: super .resetStats();
132: num_sent_msgs = num_sent_frags = num_received_msgs = num_received_frags = 0;
133: }
134:
135: /**
136: * Fragment a packet if larger than frag_size (add a header). Otherwise just pass down. Only
137: * add a header if framentation is needed !
138: */
139: public void down(Event evt) {
140: switch (evt.getType()) {
141:
142: case Event.MSG:
143: Message msg = (Message) evt.getArg();
144: long size = msg.getLength();
145: synchronized (this ) {
146: num_sent_msgs++;
147: }
148: if (size > frag_size) {
149: if (log.isTraceEnabled()) {
150: StringBuffer sb = new StringBuffer(
151: "message's buffer size is ");
152: sb.append(size).append(", will fragment ").append(
153: "(frag_size=");
154: sb.append(frag_size).append(')');
155: log.trace(sb.toString());
156: }
157: fragment(msg); // Fragment and pass down
158: return;
159: }
160: break;
161:
162: case Event.VIEW_CHANGE:
163: //don't do anything if this dude is sending out the view change
164: //we are receiving a view change,
165: //in here we check for the
166: View view = (View) evt.getArg();
167: Vector new_mbrs = view.getMembers(),
168: left_mbrs;
169: Address mbr;
170:
171: left_mbrs = Util.determineLeftMembers(members, new_mbrs);
172: members.clear();
173: members.addAll(new_mbrs);
174:
175: for (int i = 0; i < left_mbrs.size(); i++) {
176: mbr = (Address) left_mbrs.elementAt(i);
177: //the new view doesn't contain the sender, he must have left,
178: //hence we will clear all his fragmentation tables
179: fragment_list.remove(mbr);
180: if (log.isTraceEnabled())
181: log.trace("[VIEW_CHANGE] removed " + mbr
182: + " from fragmentation table");
183: }
184: break;
185:
186: case Event.CONFIG:
187: passDown(evt);
188: if (log.isDebugEnabled())
189: log.debug("received CONFIG event: " + evt.getArg());
190: handleConfigEvent((HashMap) evt.getArg());
191: return;
192: }
193:
194: passDown(evt); // Pass on to the layer below us
195: }
196:
197: /**
198: * If event is a message, if it is fragmented, re-assemble fragments into big message and pass up
199: * the stack.
200: */
201: public void up(Event evt) {
202: switch (evt.getType()) {
203:
204: case Event.MSG:
205: Message msg = (Message) evt.getArg();
206: Object obj = msg.getHeader(name);
207: if (obj != null && obj instanceof FragHeader) { // needs to be defragmented
208: unfragment(msg); // Unfragment and possibly pass up
209: return;
210: } else {
211: num_received_msgs++;
212: }
213: break;
214:
215: case Event.CONFIG:
216: passUp(evt);
217: if (log.isInfoEnabled())
218: log.info("received CONFIG event: " + evt.getArg());
219: handleConfigEvent((HashMap) evt.getArg());
220: return;
221: }
222:
223: passUp(evt); // Pass up to the layer above us by default
224: }
225:
226: /** Send all fragments as separate messages (with same ID !).
227: Example:
228: <pre>
229: Given the generated ID is 2344, number of fragments=3, message {dst,src,buf}
230: would be fragmented into:
231:
232: [2344,3,0]{dst,src,buf1},
233: [2344,3,1]{dst,src,buf2} and
234: [2344,3,2]{dst,src,buf3}
235: </pre>
236: */
237: void fragment(Message msg) {
238: byte[] buffer;
239: List fragments;
240: Event evt;
241: FragHeader hdr;
242: Message frag_msg;
243: Address dest = msg.getDest();
244: long id = getNextId(); // used as seqnos
245: int num_frags;
246: StringBuffer sb;
247: Range r;
248:
249: try {
250: buffer = msg.getBuffer();
251: fragments = Util.computeFragOffsets(buffer, frag_size);
252: num_frags = fragments.size();
253: synchronized (this ) {
254: num_sent_frags += num_frags;
255: }
256:
257: if (log.isTraceEnabled()) {
258: sb = new StringBuffer("fragmenting packet to ");
259: sb.append(
260: (dest != null ? dest.toString()
261: : "<all members>")).append(" (size=")
262: .append(buffer.length);
263: sb.append(") into ").append(num_frags).append(
264: " fragment(s) [frag_size=").append(frag_size)
265: .append(']');
266: log.trace(sb.toString());
267: }
268:
269: for (int i = 0; i < fragments.size(); i++) {
270: r = (Range) fragments.get(i);
271: // Copy the original msg (needed because we need to copy the headers too)
272: frag_msg = msg.copy(false); // don't copy the buffer, only src, dest and headers
273: frag_msg.setBuffer(buffer, (int) r.low, (int) r.high);
274: hdr = new FragHeader(id, i, num_frags);
275: frag_msg.putHeader(name, hdr);
276: evt = new Event(Event.MSG, frag_msg);
277: passDown(evt);
278: }
279: } catch (Exception e) {
280: if (log.isErrorEnabled())
281: log.error("fragmentation failure", e);
282: }
283: }
284:
285: /**
286: 1. Get all the fragment buffers
287: 2. When all are received -> Assemble them into one big buffer
288: 3. Read headers and byte buffer from big buffer
289: 4. Set headers and buffer in msg
290: 5. Pass msg up the stack
291: */
292: void unfragment(Message msg) {
293: FragmentationTable frag_table;
294: Address sender = msg.getSrc();
295: Message assembled_msg;
296: FragHeader hdr = (FragHeader) msg.removeHeader(name);
297:
298: frag_table = fragment_list.get(sender);
299: if (frag_table == null) {
300: frag_table = new FragmentationTable(sender);
301: try {
302: fragment_list.add(sender, frag_table);
303: } catch (IllegalArgumentException x) { // the entry has already been added, probably in parallel from another thread
304: frag_table = fragment_list.get(sender);
305: }
306: }
307: num_received_frags++;
308: assembled_msg = frag_table.add(hdr.id, hdr.frag_id,
309: hdr.num_frags, msg);
310: if (assembled_msg != null) {
311: try {
312: if (log.isTraceEnabled())
313: log.trace("assembled_msg is " + assembled_msg);
314: assembled_msg.setSrc(sender); // needed ? YES, because fragments have a null src !!
315: num_received_msgs++;
316: passUp(new Event(Event.MSG, assembled_msg));
317: } catch (Exception e) {
318: if (log.isErrorEnabled())
319: log.error("unfragmentation failed", e);
320: }
321: }
322: }
323:
324: void handleConfigEvent(HashMap map) {
325: if (map == null)
326: return;
327: if (map.containsKey("frag_size")) {
328: frag_size = ((Integer) map.get("frag_size")).intValue();
329: if (log.isDebugEnabled())
330: log.debug("setting frag_size=" + frag_size);
331: }
332: }
333:
334: /**
335: * A fragmentation list keeps a list of fragmentation tables
336: * sorted by an Address ( the sender ).
337: * This way, if the sender disappears or leaves the group half way
338: * sending the content, we can simply remove this members fragmentation
339: * table and clean up the memory of the receiver.
340: * We do not have to do the same for the sender, since the sender doesn't keep a fragmentation table
341: */
342: static class FragmentationList {
343: /* * HashMap<Address,FragmentationTable>, initialize the hashtable to hold all the fragmentation
344: * tables (11 is the best growth capacity to start with)
345: */
346: private final HashMap frag_tables = new HashMap(11);
347:
348: /**
349: * Adds a fragmentation table for this particular sender
350: * If this sender already has a fragmentation table, an IllegalArgumentException
351: * will be thrown.
352: * @param sender - the address of the sender, cannot be null
353: * @param table - the fragmentation table of this sender, cannot be null
354: * @exception IllegalArgumentException if an entry for this sender already exist
355: */
356: public void add(Address sender, FragmentationTable table)
357: throws IllegalArgumentException {
358: FragmentationTable healthCheck;
359:
360: synchronized (frag_tables) {
361: healthCheck = (FragmentationTable) frag_tables
362: .get(sender);
363: if (healthCheck == null) {
364: frag_tables.put(sender, table);
365: } else {
366: throw new IllegalArgumentException(
367: "Sender <"
368: + sender
369: + "> already exists in the fragementation list.");
370: }
371: }
372: }
373:
374: /**
375: * returns a fragmentation table for this sender
376: * returns null if the sender doesn't have a fragmentation table
377: * @return the fragmentation table for this sender, or null if no table exist
378: */
379: public FragmentationTable get(Address sender) {
380: synchronized (frag_tables) {
381: return (FragmentationTable) frag_tables.get(sender);
382: }
383: }
384:
385: /**
386: * returns true if this sender already holds a
387: * fragmentation for this sender, false otherwise
388: * @param sender - the sender, cannot be null
389: * @return true if this sender already has a fragmentation table
390: */
391: public boolean containsSender(Address sender) {
392: synchronized (frag_tables) {
393: return frag_tables.containsKey(sender);
394: }
395: }
396:
397: /**
398: * removes the fragmentation table from the list.
399: * after this operation, the fragementation list will no longer
400: * hold a reference to this sender's fragmentation table
401: * @param sender - the sender who's fragmentation table you wish to remove, cannot be null
402: * @return true if the table was removed, false if the sender doesn't have an entry
403: */
404: public boolean remove(Address sender) {
405: synchronized (frag_tables) {
406: boolean result = containsSender(sender);
407: frag_tables.remove(sender);
408: return result;
409: }
410: }
411:
412: /**
413: * returns a list of all the senders that have fragmentation tables
414: * opened.
415: * @return an array of all the senders in the fragmentation list
416: */
417: public Address[] getSenders() {
418: Address[] result;
419: int index = 0;
420:
421: synchronized (frag_tables) {
422: result = new Address[frag_tables.size()];
423: for (Iterator it = frag_tables.keySet().iterator(); it
424: .hasNext();) {
425: result[index++] = (Address) it.next();
426: }
427: }
428: return result;
429: }
430:
431: public String toString() {
432: Map.Entry entry;
433: StringBuffer buf = new StringBuffer(
434: "Fragmentation list contains ");
435: synchronized (frag_tables) {
436: buf.append(frag_tables.size()).append(" tables\n");
437: for (Iterator it = frag_tables.entrySet().iterator(); it
438: .hasNext();) {
439: entry = (Map.Entry) it.next();
440: buf.append(entry.getKey()).append(": ").append(
441: entry.getValue()).append("\n");
442: }
443: }
444: return buf.toString();
445: }
446:
447: }
448:
449: /**
450: * Keeps track of the fragments that are received.
451: * Reassembles fragements into entire messages when all fragments have been received.
452: * The fragmentation holds a an array of byte arrays for a unique sender
453: * The first dimension of the array is the order of the fragmentation, in case the arrive out of order
454: */
455: static class FragmentationTable {
456: private final Address sender;
457: /* the hashtable that holds the fragmentation entries for this sender*/
458: private final Hashtable h = new Hashtable(11); // keys: frag_ids, vals: Entrys
459:
460: FragmentationTable(Address sender) {
461: this .sender = sender;
462: }
463:
464: /**
465: * inner class represents an entry for a message
466: * each entry holds an array of byte arrays sorted
467: * once all the byte buffer entries have been filled
468: * the fragmentation is considered complete.
469: */
470: static class Entry {
471: //the total number of fragment in this message
472: int tot_frags = 0;
473: // each fragment is a byte buffer
474: Message fragments[] = null;
475: //the number of fragments we have received
476: int number_of_frags_recvd = 0;
477: // the message ID
478: long msg_id = -1;
479:
480: /**
481: * Creates a new entry
482: * @param tot_frags the number of fragments to expect for this message
483: */
484: Entry(long msg_id, int tot_frags) {
485: this .msg_id = msg_id;
486: this .tot_frags = tot_frags;
487: fragments = new Message[tot_frags];
488: for (int i = 0; i < tot_frags; i++)
489: fragments[i] = null;
490: }
491:
492: /**
493: * adds on fragmentation buffer to the message
494: * @param frag_id the number of the fragment being added 0..(tot_num_of_frags - 1)
495: * @param frag the byte buffer containing the data for this fragmentation, should not be null
496: */
497: public void set(int frag_id, Message frag) {
498: // don't count an already received fragment (should not happen though because the
499: // reliable transmission protocol(s) below should weed out duplicates
500: if (fragments[frag_id] == null) {
501: fragments[frag_id] = frag;
502: number_of_frags_recvd++;
503: }
504: }
505:
506: /** returns true if this fragmentation is complete
507: * ie, all fragmentations have been received for this buffer
508: *
509: */
510: public boolean isComplete() {
511: /*first make the simple check*/
512: if (number_of_frags_recvd < tot_frags) {
513: return false;
514: }
515: /*then double check just in case*/
516: for (int i = 0; i < fragments.length; i++) {
517: if (fragments[i] == null)
518: return false;
519: }
520: /*all fragmentations have been received*/
521: return true;
522: }
523:
524: /**
525: * Assembles all the fragments into one buffer. Takes all Messages, and combines their buffers into one
526: * buffer.
527: * This method does not check if the fragmentation is complete (use {@link #isComplete()} to verify
528: * before calling this method)
529: * @return the complete message in one buffer
530: *
531: */
532: public Message assembleMessage() {
533: Message retval;
534: byte[] combined_buffer, tmp;
535: int combined_length = 0, length, offset;
536: Message fragment;
537: int index = 0;
538:
539: for (int i = 0; i < fragments.length; i++) {
540: fragment = fragments[i];
541: combined_length += fragment.getLength();
542: }
543:
544: combined_buffer = new byte[combined_length];
545: for (int i = 0; i < fragments.length; i++) {
546: fragment = fragments[i];
547: tmp = fragment.getRawBuffer();
548: length = fragment.getLength();
549: offset = fragment.getOffset();
550: System.arraycopy(tmp, offset, combined_buffer,
551: index, length);
552: index += length;
553: }
554:
555: retval = fragments[0].copy(false);
556: retval.setBuffer(combined_buffer);
557: return retval;
558: }
559:
560: /**
561: * debug only
562: */
563: public String toString() {
564: StringBuffer ret = new StringBuffer();
565: ret.append("[tot_frags=").append(tot_frags).append(
566: ", number_of_frags_recvd=").append(
567: number_of_frags_recvd).append(']');
568: return ret.toString();
569: }
570:
571: public int hashCode() {
572: return super .hashCode();
573: }
574: }
575:
576: /**
577: * Creates a new entry if not yet present. Adds the fragment.
578: * If all fragements for a given message have been received,
579: * an entire message is reassembled and returned.
580: * Otherwise null is returned.
581: * @param id - the message ID, unique for a sender
582: * @param frag_id the index of this fragmentation (0..tot_frags-1)
583: * @param tot_frags the total number of fragmentations expected
584: * @param fragment - the byte buffer for this fragment
585: */
586: public synchronized Message add(long id, int frag_id,
587: int tot_frags, Message fragment) {
588: Message retval = null;
589:
590: Entry e = (Entry) h.get(new Long(id));
591:
592: if (e == null) { // Create new entry if not yet present
593: e = new Entry(id, tot_frags);
594: h.put(new Long(id), e);
595: }
596:
597: e.set(frag_id, fragment);
598: if (e.isComplete()) {
599: retval = e.assembleMessage();
600: h.remove(new Long(id));
601: }
602:
603: return retval;
604: }
605:
606: public void reset() {
607: }
608:
609: public String toString() {
610: StringBuffer buf = new StringBuffer(
611: "Fragmentation Table Sender:").append(sender)
612: .append("\n\t");
613: java.util.Enumeration e = this .h.elements();
614: while (e.hasMoreElements()) {
615: Entry entry = (Entry) e.nextElement();
616: int count = 0;
617: for (int i = 0; i < entry.fragments.length; i++) {
618: if (entry.fragments[i] != null) {
619: count++;
620: }
621: }
622: buf.append("Message ID:").append(entry.msg_id).append(
623: "\n\t");
624: buf.append("Total Frags:").append(entry.tot_frags)
625: .append("\n\t");
626: buf.append("Frags Received:").append(count).append(
627: "\n\n");
628: }
629: return buf.toString();
630: }
631: }
632:
633: }
|