001: // $Id: FRAG.java,v 1.32.2.1 2007/04/27 08:03:51 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.Address;
006: import org.jgroups.Event;
007: import org.jgroups.Message;
008: import org.jgroups.View;
009: import org.jgroups.stack.Protocol;
010: import org.jgroups.util.ExposedByteArrayOutputStream;
011: import org.jgroups.util.Util;
012:
013: import java.io.ByteArrayInputStream;
014: import java.io.DataInputStream;
015: import java.io.DataOutputStream;
016: import java.util.*;
017:
018: /**
019: * Fragmentation layer. Fragments messages larger than FRAG_SIZE into smaller packets.
020: * Reassembles fragmented packets into bigger ones. The fragmentation number is prepended
021: * to the messages as a header (and removed at the receiving side).<p>
022: * Each fragment is identified by (a) the sender (part of the message to which the header is appended),
023: * (b) the fragmentation ID (which is unique per FRAG layer (monotonically increasing) and (c) the
024: * fragement ID which ranges from 0 to number_of_fragments-1.<p>
025: * Requirement: lossless delivery (e.g. NAK, ACK). No requirement on ordering. Works for both unicast and
026: * multicast messages.
027: * @author Bela Ban
028: * @author Filip Hanik
029: * @version $Id: FRAG.java,v 1.32.2.1 2007/04/27 08:03:51 belaban Exp $
030: */
031: public class FRAG extends Protocol {
032: private int frag_size = 8192; // conservative value
033:
034: /*the fragmentation list contains a fragmentation table per sender
035: *this way it becomes easier to clean up if a sender (member) leaves or crashes
036: */
037: private final FragmentationList fragment_list = new FragmentationList();
038: private int curr_id = 1;
039: private final ExposedByteArrayOutputStream bos = new ExposedByteArrayOutputStream(
040: 1024);
041: private final Vector members = new Vector(11);
042: private final static String name = "FRAG";
043:
044: long num_sent_msgs = 0;
045: long num_sent_frags = 0;
046: long num_received_msgs = 0;
047: long num_received_frags = 0;
048:
049: public String getName() {
050: return name;
051: }
052:
053: public int getFragSize() {
054: return frag_size;
055: }
056:
057: public void setFragSize(int s) {
058: frag_size = s;
059: }
060:
061: public long getNumberOfSentMessages() {
062: return num_sent_msgs;
063: }
064:
065: public long getNumberOfSentFragments() {
066: return num_sent_frags;
067: }
068:
069: public long getNumberOfReceivedMessages() {
070: return num_received_msgs;
071: }
072:
073: public long getNumberOfReceivedFragments() {
074: return num_received_frags;
075: }
076:
077: /**
078: * Setup the Protocol instance acording to the configuration string
079: */
080: public boolean setProperties(Properties props) {
081: String str;
082:
083: super .setProperties(props);
084: str = props.getProperty("frag_size");
085: if (str != null) {
086: frag_size = Integer.parseInt(str);
087: props.remove("frag_size");
088: }
089:
090: if (props.size() > 0) {
091: log
092: .error("FRAG.setProperties(): the following properties are not recognized: "
093: + props);
094: return false;
095: }
096: return true;
097: }
098:
099: public void resetStats() {
100: super .resetStats();
101: num_sent_msgs = num_sent_frags = num_received_msgs = num_received_frags = 0;
102: }
103:
104: /**
105: * Fragment a packet if larger than frag_size (add a header). Otherwise just pass down. Only
106: * add a header if framentation is needed !
107: */
108: public void down(Event evt) {
109: switch (evt.getType()) {
110:
111: case Event.MSG:
112: Message msg = (Message) evt.getArg();
113: long size = msg.size();
114: num_sent_msgs++;
115: if (size > frag_size) {
116: if (log.isTraceEnabled()) {
117: StringBuffer sb = new StringBuffer(
118: "message size is ");
119: sb.append(size).append(
120: ", will fragment (frag_size=").append(
121: frag_size).append(')');
122: log.trace(sb.toString());
123: }
124: fragment(msg); // Fragment and pass down
125: return;
126: }
127: break;
128:
129: case Event.VIEW_CHANGE:
130: //don't do anything if this dude is sending out the view change
131: //we are receiving a view change,
132: //in here we check for the
133: View view = (View) evt.getArg();
134: Vector new_mbrs = view.getMembers(),
135: left_mbrs;
136: Address mbr;
137:
138: left_mbrs = Util.determineLeftMembers(members, new_mbrs);
139: members.clear();
140: members.addAll(new_mbrs);
141:
142: for (int i = 0; i < left_mbrs.size(); i++) {
143: mbr = (Address) left_mbrs.elementAt(i);
144: //the new view doesn't contain the sender, he must have left,
145: //hence we will clear all his fragmentation tables
146: fragment_list.remove(mbr);
147: if (log.isTraceEnabled())
148: log.trace("[VIEW_CHANGE] removed " + mbr
149: + " from fragmentation table");
150: }
151: break;
152:
153: case Event.CONFIG:
154: passDown(evt);
155: if (log.isDebugEnabled())
156: log.debug("received CONFIG event: " + evt.getArg());
157: handleConfigEvent((HashMap) evt.getArg());
158: return;
159: }
160:
161: passDown(evt); // Pass on to the layer below us
162: }
163:
164: /**
165: * If event is a message, if it is fragmented, re-assemble fragments into big message and pass up the stack.
166: */
167: public void up(Event evt) {
168: switch (evt.getType()) {
169:
170: case Event.MSG:
171: Message msg = (Message) evt.getArg();
172: Object obj = msg.getHeader(name);
173: if (obj != null && obj instanceof FragHeader) { // needs to be defragmented
174: unfragment(msg); // Unfragment and possibly pass up
175: return;
176: } else {
177: num_received_msgs++;
178: }
179: break;
180:
181: case Event.CONFIG:
182: passUp(evt);
183: if (log.isDebugEnabled())
184: log.debug("received CONFIG event: " + evt.getArg());
185: handleConfigEvent((HashMap) evt.getArg());
186: return;
187: }
188:
189: passUp(evt); // Pass up to the layer above us by default
190: }
191:
192: /**
193: * Send all fragments as separate messages (with same ID !).
194: * Example:
195: * <pre>
196: * Given the generated ID is 2344, number of fragments=3, message {dst,src,buf}
197: * would be fragmented into:
198: * <p/>
199: * [2344,3,0]{dst,src,buf1},
200: * [2344,3,1]{dst,src,buf2} and
201: * [2344,3,2]{dst,src,buf3}
202: * </pre>
203: */
204: private void fragment(Message msg) {
205: DataOutputStream out = null;
206: byte[] buffer;
207: byte[] fragments[];
208: Event evt;
209: FragHeader hdr;
210: Message frag_msg;
211: Address dest = msg.getDest(), src = msg.getSrc();
212: long id = curr_id++; // used as seqnos
213: int num_frags;
214: int size;
215:
216: try {
217: // Write message into a byte buffer and fragment it
218: // Synchronization around bos is needed for concurrent access (http://jira.jboss.com/jira/browse/JGRP-215)
219: synchronized (bos) {
220: bos.reset();
221: out = new DataOutputStream(bos);
222: msg.writeTo(out);
223: out.flush();
224: buffer = bos.getRawBuffer();
225: fragments = Util.fragmentBuffer(buffer, frag_size, bos
226: .size());
227: }
228:
229: num_frags = fragments.length;
230: num_sent_frags += num_frags;
231:
232: if (log.isTraceEnabled()) {
233: StringBuffer sb = new StringBuffer();
234: sb.append("fragmenting packet to ").append(
235: dest != null ? dest.toString()
236: : "<all members>");
237: sb.append(" (size=").append(buffer.length).append(
238: ") into ").append(num_frags);
239: sb.append(" fragment(s) [frag_size=").append(frag_size)
240: .append(']');
241: log.trace(sb.toString());
242: }
243:
244: for (int i = 0; i < num_frags; i++) {
245: frag_msg = new Message(dest, src, fragments[i]);
246: hdr = new FragHeader(id, i, num_frags);
247: frag_msg.putHeader(name, hdr);
248: evt = new Event(Event.MSG, frag_msg);
249: passDown(evt);
250: }
251: } catch (Exception e) {
252: log.error("exception occurred trying to fragment message",
253: e);
254: } finally {
255: Util.close(out);
256: }
257: }
258:
259: /**
260: * 1. Get all the fragment buffers
261: * 2. When all are received -> Assemble them into one big buffer
262: * 3. Read headers and byte buffer from big buffer
263: * 4. Set headers and buffer in msg
264: * 5. Pass msg up the stack
265: */
266: private void unfragment(Message msg) {
267: FragmentationTable frag_table;
268: Address sender = msg.getSrc();
269: Message assembled_msg;
270: FragHeader hdr = (FragHeader) msg.removeHeader(name);
271: byte[] m;
272: ByteArrayInputStream bis;
273: DataInputStream in = null;
274:
275: frag_table = fragment_list.get(sender);
276: if (frag_table == null) {
277: frag_table = new FragmentationTable(sender);
278: try {
279: fragment_list.add(sender, frag_table);
280: } catch (IllegalArgumentException x) { // the entry has already been added, probably in parallel from another thread
281: frag_table = fragment_list.get(sender);
282: }
283: }
284: num_received_frags++;
285: m = frag_table.add(hdr.id, hdr.frag_id, hdr.num_frags, msg
286: .getBuffer());
287: if (m != null) {
288: try {
289: bis = new ByteArrayInputStream(m);
290: in = new DataInputStream(bis);
291: assembled_msg = new Message(false);
292: assembled_msg.readFrom(in);
293: if (log.isTraceEnabled())
294: log.trace("assembled_msg is " + assembled_msg);
295: assembled_msg.setSrc(sender); // needed ? YES, because fragments have a null src !!
296: num_received_msgs++;
297: passUp(new Event(Event.MSG, assembled_msg));
298: } catch (Exception e) {
299: log.error("failed unfragmenting a message", e);
300: } finally {
301: Util.close(in);
302: }
303: }
304: }
305:
306: void handleConfigEvent(HashMap map) {
307: if (map == null)
308: return;
309: if (map.containsKey("frag_size")) {
310: frag_size = ((Integer) map.get("frag_size")).intValue();
311: if (log.isDebugEnabled())
312: log.debug("setting frag_size=" + frag_size);
313: }
314: }
315:
316: /**
317: * A fragmentation list keeps a list of fragmentation tables
318: * sorted by an Address ( the sender ).
319: * This way, if the sender disappears or leaves the group half way
320: * sending the content, we can simply remove this members fragmentation
321: * table and clean up the memory of the receiver.
322: * We do not have to do the same for the sender, since the sender doesn't keep a fragmentation table
323: */
324: static class FragmentationList {
325: /* initialize the hashtable to hold all the fragmentation tables
326: * 11 is the best growth capacity to start with<br/>
327: * HashMap<Address,FragmentationTable>
328: */
329: private final HashMap frag_tables = new HashMap(11);
330:
331: /**
332: * Adds a fragmentation table for this particular sender
333: * If this sender already has a fragmentation table, an IllegalArgumentException
334: * will be thrown.
335: * @param sender - the address of the sender, cannot be null
336: * @param table - the fragmentation table of this sender, cannot be null
337: * @throws IllegalArgumentException if an entry for this sender already exist
338: */
339: public void add(Address sender, FragmentationTable table)
340: throws IllegalArgumentException {
341: FragmentationTable healthCheck;
342:
343: synchronized (frag_tables) {
344: healthCheck = (FragmentationTable) frag_tables
345: .get(sender);
346: if (healthCheck == null) {
347: frag_tables.put(sender, table);
348: } else {
349: throw new IllegalArgumentException(
350: "Sender <"
351: + sender
352: + "> already exists in the fragementation list");
353: }
354: }
355: }
356:
357: /**
358: * returns a fragmentation table for this sender
359: * returns null if the sender doesn't have a fragmentation table
360: * @return the fragmentation table for this sender, or null if no table exist
361: */
362: public FragmentationTable get(Address sender) {
363: synchronized (frag_tables) {
364: return (FragmentationTable) frag_tables.get(sender);
365: }
366: }
367:
368: /**
369: * returns true if this sender already holds a
370: * fragmentation for this sender, false otherwise
371: * @param sender - the sender, cannot be null
372: * @return true if this sender already has a fragmentation table
373: */
374: public boolean containsSender(Address sender) {
375: synchronized (frag_tables) {
376: return frag_tables.containsKey(sender);
377: }
378: }
379:
380: /**
381: * removes the fragmentation table from the list.
382: * after this operation, the fragementation list will no longer
383: * hold a reference to this sender's fragmentation table
384: * @param sender - the sender who's fragmentation table you wish to remove, cannot be null
385: * @return true if the table was removed, false if the sender doesn't have an entry
386: */
387: public boolean remove(Address sender) {
388: synchronized (frag_tables) {
389: boolean result = containsSender(sender);
390: frag_tables.remove(sender);
391: return result;
392: }
393: }
394:
395: /**
396: * returns a list of all the senders that have fragmentation tables opened.
397: * @return an array of all the senders in the fragmentation list
398: */
399: public Address[] getSenders() {
400: Address[] result;
401: int index = 0;
402:
403: synchronized (frag_tables) {
404: result = new Address[frag_tables.size()];
405: for (Iterator it = frag_tables.keySet().iterator(); it
406: .hasNext();) {
407: result[index++] = (Address) it.next();
408: }
409: }
410: return result;
411: }
412:
413: public String toString() {
414: Map.Entry entry;
415: StringBuffer buf = new StringBuffer(
416: "Fragmentation list contains ");
417: synchronized (frag_tables) {
418: buf.append(frag_tables.size()).append(" tables\n");
419: for (Iterator it = frag_tables.entrySet().iterator(); it
420: .hasNext();) {
421: entry = (Map.Entry) it.next();
422: buf.append(entry.getKey()).append(": ").append(
423: entry.getValue()).append("\n");
424: }
425: }
426: return buf.toString();
427: }
428:
429: }
430:
431: /**
432: * Keeps track of the fragments that are received.
433: * Reassembles fragements into entire messages when all fragments have been received.
434: * The fragmentation holds a an array of byte arrays for a unique sender
435: * The first dimension of the array is the order of the fragmentation, in case the arrive out of order
436: */
437: static class FragmentationTable {
438: private final Address sender;
439: /* the hashtable that holds the fragmentation entries for this sender*/
440: private final Hashtable h = new Hashtable(11); // keys: frag_ids, vals: Entrys
441:
442: FragmentationTable(Address sender) {
443: this .sender = sender;
444: }
445:
446: /**
447: * inner class represents an entry for a message
448: * each entry holds an array of byte arrays sorted
449: * once all the byte buffer entries have been filled
450: * the fragmentation is considered complete.
451: */
452: static class Entry {
453: //the total number of fragment in this message
454: int tot_frags = 0;
455: // each fragment is a byte buffer
456: byte[] fragments[] = null;
457: //the number of fragments we have received
458: int number_of_frags_recvd = 0;
459: // the message ID
460: long msg_id = -1;
461:
462: /**
463: * Creates a new entry
464: *
465: * @param tot_frags the number of fragments to expect for this message
466: */
467: Entry(long msg_id, int tot_frags) {
468: this .msg_id = msg_id;
469: this .tot_frags = tot_frags;
470: fragments = new byte[tot_frags][];
471: for (int i = 0; i < tot_frags; i++) {
472: fragments[i] = null;
473: }
474: }
475:
476: /**
477: * adds on fragmentation buffer to the message
478: *
479: * @param frag_id the number of the fragment being added 0..(tot_num_of_frags - 1)
480: * @param frag the byte buffer containing the data for this fragmentation, should not be null
481: */
482: public void set(int frag_id, byte[] frag) {
483: fragments[frag_id] = frag;
484: number_of_frags_recvd++;
485: }
486:
487: /**
488: * returns true if this fragmentation is complete
489: * ie, all fragmentations have been received for this buffer
490: */
491: public boolean isComplete() {
492: /*first make the simple check*/
493: if (number_of_frags_recvd < tot_frags) {
494: return false;
495: }
496: /*then double check just in case*/
497: for (int i = 0; i < fragments.length; i++) {
498: if (fragments[i] == null)
499: return false;
500: }
501: /*all fragmentations have been received*/
502: return true;
503: }
504:
505: /**
506: * Assembles all the fragmentations into one buffer
507: * this method does not check if the fragmentation is complete
508: *
509: * @return the complete message in one buffer
510: */
511: public byte[] assembleBuffer() {
512: return Util.defragmentBuffer(fragments);
513: }
514:
515: /**
516: * debug only
517: */
518: public String toString() {
519: StringBuffer ret = new StringBuffer();
520: ret.append("[tot_frags=").append(tot_frags).append(
521: ", number_of_frags_recvd=").append(
522: number_of_frags_recvd).append(']');
523: return ret.toString();
524: }
525:
526: public int hashCode() {
527: return super .hashCode();
528: }
529: }
530:
531: /**
532: * Creates a new entry if not yet present. Adds the fragment.
533: * If all fragements for a given message have been received,
534: * an entire message is reassembled and returned.
535: * Otherwise null is returned.
536: *
537: * @param id - the message ID, unique for a sender
538: * @param frag_id the index of this fragmentation (0..tot_frags-1)
539: * @param tot_frags the total number of fragmentations expected
540: * @param fragment - the byte buffer for this fragment
541: */
542: public synchronized byte[] add(long id, int frag_id,
543: int tot_frags, byte[] fragment) {
544:
545: /*initialize the return value to default not complete */
546: byte[] retval = null;
547:
548: Entry e = (Entry) h.get(new Long(id));
549:
550: if (e == null) { // Create new entry if not yet present
551: e = new Entry(id, tot_frags);
552: h.put(new Long(id), e);
553: }
554:
555: e.set(frag_id, fragment);
556: if (e.isComplete()) {
557: retval = e.assembleBuffer();
558: h.remove(new Long(id));
559: }
560:
561: return retval;
562: }
563:
564: public void reset() {
565: }
566:
567: public String toString() {
568: StringBuffer buf = new StringBuffer(
569: "Fragmentation Table Sender:").append(sender)
570: .append("\n\t");
571: java.util.Enumeration e = this .h.elements();
572: while (e.hasMoreElements()) {
573: Entry entry = (Entry) e.nextElement();
574: int count = 0;
575: for (int i = 0; i < entry.fragments.length; i++) {
576: if (entry.fragments[i] != null) {
577: count++;
578: }
579: }
580: buf.append("Message ID:").append(entry.msg_id).append(
581: "\n\t");
582: buf.append("Total Frags:").append(entry.tot_frags)
583: .append("\n\t");
584: buf.append("Frags Received:").append(count).append(
585: "\n\n");
586: }
587: return buf.toString();
588: }
589: }
590:
591: }
|