001: // $Id: PIGGYBACK.java,v 1.10.10.1 2007/04/27 08:03:51 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.*;
006: import org.jgroups.stack.Protocol;
007: import org.jgroups.util.Queue;
008: import org.jgroups.util.QueueClosedException;
009: import org.jgroups.util.Util;
010:
011: import java.io.IOException;
012: import java.io.ObjectInput;
013: import java.io.ObjectOutput;
014: import java.util.Properties;
015: import java.util.Vector;
016:
017: /**
018: * Combines multiple messages into a single large one. As many messages as possible are combined into
019: * one, after a max timeout or when the msg size becomes too big the message is sent. On the receiving
020: * side, the large message is spliced into the smaller ones and delivered.
021: */
022:
023: public class PIGGYBACK extends Protocol {
024: long max_wait_time = 20; // milliseconds: max. wait between consecutive msgs
025: long max_size = 8192; // don't piggyback if created msg would exceed this size (in bytes)
026: final Queue msg_queue = new Queue();
027: Packer packer = null;
028: boolean packing = false;
029: Address local_addr = null;
030:
031: class Packer implements Runnable {
032: Thread t = null;
033:
034: public void start() {
035: if (t == null) {
036: t = new Thread(this , "Packer thread");
037: t.setDaemon(true);
038: t.start();
039: }
040: }
041:
042: public void stop() {
043: t = null;
044: }
045:
046: public void run() {
047: long current_size = 0;
048: long start_time, time_to_wait = max_wait_time;
049: Message m, new_msg;
050: Vector msgs;
051:
052: while (packer != null) {
053: try {
054: m = (Message) msg_queue.remove();
055: m.setSrc(local_addr);
056: start_time = System.currentTimeMillis();
057: current_size = 0;
058: new_msg = new Message();
059: msgs = new Vector();
060: msgs.addElement(m);
061: current_size += m.size();
062:
063: while (System.currentTimeMillis() - start_time <= max_wait_time
064: && current_size <= max_size) {
065:
066: time_to_wait = max_wait_time
067: - (System.currentTimeMillis() - start_time);
068: if (time_to_wait <= 0)
069: break;
070:
071: try {
072: m = (Message) msg_queue.peek(time_to_wait);
073: m.setSrc(local_addr);
074: } catch (TimeoutException timeout) {
075: break;
076: }
077: if (m == null
078: || m.size() + current_size > max_size)
079: break;
080: m = (Message) msg_queue.remove();
081: current_size += m.size();
082: msgs.addElement(m);
083: }
084:
085: try {
086: new_msg.putHeader(getName(),
087: new PiggybackHeader());
088: new_msg
089: .setBuffer(Util
090: .objectToByteBuffer(msgs));
091: passDown(new Event(Event.MSG, new_msg));
092:
093: if (log.isInfoEnabled())
094: log.info("combined " + msgs.size()
095: + " messages of a total size of "
096: + current_size + " bytes");
097: } catch (Exception e) {
098: if (log.isWarnEnabled())
099: log.warn("exception is " + e);
100: }
101: } catch (QueueClosedException closed) {
102: if (log.isInfoEnabled())
103: log.info("packer stopped as queue is closed");
104: break;
105: }
106: }
107: }
108: }
109:
110: /**
111: * All protocol names have to be unique !
112: */
113: public String getName() {
114: return "PIGGYBACK";
115: }
116:
117: public boolean setProperties(Properties props) {
118: super .setProperties(props);
119: String str;
120:
121: str = props.getProperty("max_wait_time");
122: if (str != null) {
123: max_wait_time = Long.parseLong(str);
124: props.remove("max_wait_time");
125: }
126: str = props.getProperty("max_size");
127: if (str != null) {
128: max_size = Long.parseLong(str);
129: props.remove("max_size");
130: }
131:
132: if (props.size() > 0) {
133: log
134: .error("PIGGYBACK.setProperties(): these properties are not recognized: "
135: + props);
136:
137: return false;
138: }
139: return true;
140: }
141:
142: public void start() throws Exception {
143: startPacker();
144: }
145:
146: public void stop() {
147: packing = false;
148: msg_queue.close(true); // flush pending messages, this should also stop the packer ...
149: stopPacker(); // ... but for safety reasons, we stop it here again
150: }
151:
152: public void up(Event evt) {
153: Message msg;
154: Object obj;
155: Vector messages;
156:
157: switch (evt.getType()) {
158:
159: case Event.SET_LOCAL_ADDRESS:
160: local_addr = (Address) evt.getArg();
161: break;
162:
163: case Event.MSG:
164: msg = (Message) evt.getArg();
165: obj = msg.getHeader(getName());
166: if (obj == null || !(obj instanceof PiggybackHeader))
167: break;
168:
169: msg.removeHeader(getName());
170: try {
171: messages = (Vector) msg.getObject();
172: if (log.isInfoEnabled())
173: log.info("unpacking " + messages.size()
174: + " messages");
175: for (int i = 0; i < messages.size(); i++)
176: passUp(new Event(Event.MSG, messages.elementAt(i)));
177: } catch (Exception e) {
178: if (log.isWarnEnabled())
179: log
180: .warn("piggyback message does not contain a vector of "
181: + "piggybacked messages, discarding message ! Exception is "
182: + e);
183: return;
184: }
185:
186: return; // don't pass up !
187: }
188:
189: passUp(evt); // Pass up to the layer above us
190: }
191:
192: public void down(Event evt) {
193: Message msg;
194:
195: switch (evt.getType()) {
196:
197: case Event.MSG:
198: msg = (Message) evt.getArg();
199:
200: if (msg.getDest() != null
201: && !msg.getDest().isMulticastAddress())
202: break; // unicast message, handle as usual
203:
204: if (!packing)
205: break; // pass down as usual; we haven't started yet
206:
207: try {
208: msg_queue.add(msg);
209: } catch (QueueClosedException closed) {
210: break; // pass down regularly
211: }
212: return;
213: }
214:
215: passDown(evt); // Pass on to the layer below us
216: }
217:
218: void startPacker() {
219: if (packer == null) {
220: packing = true;
221: packer = new Packer();
222: packer.start();
223: }
224: }
225:
226: void stopPacker() {
227: if (packer != null) {
228: packer.stop();
229: packing = false;
230: msg_queue.close(false);
231: packer = null;
232: }
233: }
234:
235: public static class PiggybackHeader extends Header {
236:
237: public PiggybackHeader() {
238: }
239:
240: public String toString() {
241: return "[PIGGYBACK: <variables> ]";
242: }
243:
244: public void writeExternal(ObjectOutput out) throws IOException {
245: }
246:
247: public void readExternal(ObjectInput in) throws IOException,
248: ClassNotFoundException {
249: }
250:
251: }
252:
253: }
|