001: // $Id: WANPIPE.java,v 1.7 2005/05/30 14:31:24 belaban Exp $
002:
003: package org.jgroups.protocols;
004:
005: import org.jgroups.*;
006: import org.jgroups.blocks.LogicalLink;
007: import org.jgroups.stack.Protocol;
008: import org.jgroups.util.List;
009: import org.jgroups.util.Util;
010:
011: import java.io.IOException;
012: import java.io.ObjectInput;
013: import java.io.ObjectOutput;
014: import java.net.InetAddress;
015: import java.util.Enumeration;
016: import java.util.Properties;
017: import java.util.StringTokenizer;
018: import java.util.Vector;
019:
020: /**
021: Logical point-to-point link. Uses multiple physical links to provide a reliable transport. For example,
022: if there are 2 physical links over different networks, and one of them fails, the WAN pipe will still be
023: able to send traffic over the other link. Currently traffic is sent over the physical links round-robin,
024: but this will be made configurable in the future. Example: 70% over first link, 30% over second, or
025: packets are split and sent across both links (increasing the available bandwidth).
026: */
027: public class WANPIPE extends Protocol implements LogicalLink.Receiver {
028: LogicalLink pipe = null;
029: String name = null; // logical name of WAN pipe
030: final List links = new List(); // contains the parsed link descriptions
031:
032: Address local_addr = null;
033: String group_addr = null;
034: final Properties properties = null;
035: final Vector members = new Vector();
036:
037: public WANPIPE() {
038: pipe = new LogicalLink(this );
039: }
040:
041: public String toString() {
042: return "Protocol WANPIPE(local address: " + local_addr + ')';
043: }
044:
045: public String getName() {
046: return "WANPIPE";
047: }
048:
049: /**
050: Sent to destination(s) using the WAN pipe. Send local messages directly back up the stack
051: */
052: public void down(Event evt) {
053: Message msg, rsp, copy;
054: Address dest_addr;
055:
056: if (evt.getType() != Event.MSG) {
057: handleDownEvent(evt);
058: return;
059: }
060:
061: msg = (Message) evt.getArg();
062: dest_addr = msg.getDest();
063:
064: if (dest_addr == null) { // send both local and remote
065: for (int i = 0; i < members.size(); i++) {
066: dest_addr = (Address) members.elementAt(i);
067:
068: if (dest_addr.equals(local_addr)) { // local or ...
069: returnLocal(msg);
070: } else { // remote
071: copy = msg.copy();
072: copy.setDest(dest_addr);
073: copy.putHeader(getName(), new WanPipeHeader(
074: group_addr));
075: sendUnicastMessage(copy);
076: }
077: }
078: } else {
079: if (dest_addr.equals(local_addr)) { // destination can either be local ...
080: returnLocal(msg);
081: } else { // or remote
082: msg.putHeader(getName(), new WanPipeHeader(group_addr));
083: sendUnicastMessage(msg);
084: }
085: }
086: }
087:
088: /** Make a response and send back up the same stack it came down */
089: void returnLocal(Message msg) {
090: Message rsp = msg.copy();
091: rsp.setDest(local_addr);
092: rsp.setSrc(local_addr);
093: passUp(new Event(Event.MSG, rsp));
094: }
095:
096: public void start() throws Exception {
097: LinkInfo l;
098:
099: for (Enumeration e = links.elements(); e.hasMoreElements();) {
100: l = (LinkInfo) e.nextElement();
101: pipe.addLink(l.local_addr, l.local_port, l.remote_addr,
102: l.remote_port);
103: }
104: pipe.start();
105: local_addr = new WanPipeAddress(name); // logical address for the WAN pipe
106: passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
107: }
108:
109: public void stop() {
110: pipe.stop();
111: pipe.removeAllLinks();
112: }
113:
114: // LogicalLink.Receiver interface
115: public void receive(byte[] buf) {
116: WanPipeHeader hdr = null;
117: Message msg = null;
118:
119: try {
120: msg = (Message) Util.objectFromByteBuffer(buf);
121: } catch (Exception e) {
122: log.error("WANPIPE.receive(): " + e);
123: return;
124: }
125:
126: if (log.isInfoEnabled())
127: log.info("received msg " + msg);
128: hdr = (WanPipeHeader) msg.removeHeader(getName());
129:
130: /* Discard all messages destined for a channel with a different name */
131: String ch_name = null;
132:
133: if (hdr.group_addr != null)
134: ch_name = hdr.group_addr;
135:
136: if (group_addr == null) {
137: log
138: .error("WANPIPE.receive(): group address in header was null, discarded");
139: return;
140: }
141:
142: if (ch_name != null && !group_addr.equals(ch_name))
143: return;
144:
145: passUp(new Event(Event.MSG, msg));
146: }
147:
148: public void linkDown(InetAddress local, int local_port,
149: InetAddress remote, int remote_port) {
150: Object p = getPeer();
151:
152: passUp(new Event(Event.SUSPECT, p));
153: }
154:
155: public void linkUp(InetAddress local, int local_port,
156: InetAddress remote, int remote_port) {
157:
158: }
159:
160: public void missedHeartbeat(InetAddress local, int local_port,
161: InetAddress remote, int remote_port, int num_hbs) {
162:
163: }
164:
165: public void receivedHeartbeatAgain(InetAddress local,
166: int local_port, InetAddress remote, int remote_port) {
167:
168: }
169:
170: /** Setup the Protocol instance acording to the configuration string */
171: public boolean setProperties(Properties props) {
172: super .setProperties(props);
173: String str;
174:
175: str = props.getProperty("name");
176: if (str != null) {
177: name = str;
178: props.remove("name");
179: }
180:
181: str = props.getProperty("links");
182: if (str != null) {
183:
184: // parse links and put them in list (as LinkInfos)
185: if (parseLinks(str) == false)
186: return false;
187: props.remove("links");
188: }
189:
190: if (name == null || name.length() == 0) {
191: log.error("WANPIPE.setProperties(): 'name' must be set");
192: return false;
193: }
194: if (links.size() == 0) {
195: log
196: .error("WANPIPE.setProperties(): no links specified (at least 1 link must be present)");
197: return false;
198: }
199:
200: if (props.size() > 0) {
201: log
202: .error("WANPIPE.setProperties(): the following properties are not recognized: "
203: + props);
204:
205: return false;
206: }
207: return true;
208: }
209:
210: /** Parse link spec and put each link into 'links' (as LinkInfo) <br>
211: Example: <pre> [daddy@6666,daddy@7777,daddy@7777,sindhu@6666] </pre>*/
212: boolean parseLinks(String s) {
213: LinkInfo info;
214: StringTokenizer tok;
215: String src, dst;
216: int index = 0; // holds position of '@'
217:
218: s = s.replace('[', ' ');
219: s = s.replace(']', ' ');
220: s = s.trim();
221: tok = new StringTokenizer(s, ",");
222: while (tok.hasMoreElements()) {
223: src = tok.nextToken().trim();
224: dst = tok.nextToken().trim();
225: info = new LinkInfo();
226:
227: index = src.indexOf('@');
228: if (index == -1) {
229: log.error("WANPIPE.parseLinks(): local address " + src
230: + " must have a @ separator");
231: return false;
232: }
233: info.local_addr = src.substring(0, index);
234: info.local_port = Integer.parseInt(src.substring(index + 1,
235: src.length()));
236:
237: index = dst.indexOf('@');
238: if (index == -1) {
239: log.error("WANPIPE.parseLinks(): remote address " + dst
240: + " must have a @ separator");
241: return false;
242: }
243: info.remote_addr = dst.substring(0, index);
244: info.remote_port = Integer.parseInt(dst.substring(
245: index + 1, dst.length()));
246:
247: links.add(info);
248: }
249:
250: return true;
251: }
252:
253: Object getPeer() {
254: Object ret = null;
255: if (members == null || members.size() == 0
256: || local_addr == null)
257: return null;
258: for (int i = 0; i < members.size(); i++)
259: if (!members.elementAt(i).equals(local_addr))
260: return members.elementAt(i);
261: return ret;
262: }
263:
264: /**
265: If the sender is null, set our own address. We cannot just go ahead and set the address
266: anyway, as we might be sending a message on behalf of someone else ! E.g. in case of
267: retransmission, when the original sender has crashed, or in a FLUSH protocol when we
268: have to return all unstable messages with the FLUSH_OK response.
269: */
270: private void setSourceAddress(Message msg) {
271: if (msg.getSrc() == null)
272: msg.setSrc(local_addr);
273: }
274:
275: /** Send a message to the address specified in msg.dest */
276: private void sendUnicastMessage(Message msg) {
277: byte[] buf = null;
278:
279: setSourceAddress(msg);
280: try {
281: buf = Util.objectToByteBuffer(msg);
282: } catch (Exception e) {
283: log.error("WANPIPE.sendUnicastMessage(): " + e);
284: return;
285: }
286:
287: try {
288: pipe.send(buf);
289: } catch (LogicalLink.AllLinksDown links_down) {
290: log
291: .error("WANPIPE.sendUnicastMessage(): WAN pipe has no currently operational "
292: + "link to send message. Discarding it.");
293: } catch (LogicalLink.NoLinksAvailable no_links) {
294: log
295: .error("WANPIPE.sendUnicastMessage(): WAN pipe has no physical links configured;"
296: + " cannot send message");
297: } catch (Exception e) {
298: log.error("WANPIPE.sendUnicastMessage(): " + e);
299: }
300: }
301:
302: private void handleUpEvent(Event evt) {
303: switch (evt.getType()) {
304:
305: case Event.SUSPECT:
306: break;
307: }
308: }
309:
310: private void handleDownEvent(Event evt) {
311: switch (evt.getType()) {
312:
313: case Event.TMP_VIEW:
314: case Event.VIEW_CHANGE:
315: synchronized (members) {
316: members.removeAllElements();
317: Vector tmpvec = ((View) evt.getArg()).getMembers();
318: for (int i = 0; i < tmpvec.size(); i++)
319: members.addElement(tmpvec.elementAt(i));
320: }
321: break;
322:
323: case Event.SUSPECT:
324: break;
325:
326: case Event.GET_LOCAL_ADDRESS: // return local address -> Event(SET_LOCAL_ADDRESS, local)
327: passUp(new Event(Event.SET_LOCAL_ADDRESS, local_addr));
328: break;
329:
330: case Event.CONNECT:
331: group_addr = (String) evt.getArg();
332: passUp(new Event(Event.CONNECT_OK));
333: break;
334:
335: case Event.DISCONNECT:
336: passUp(new Event(Event.DISCONNECT_OK));
337: break;
338:
339: }
340: }
341:
342: private static class LinkInfo {
343: String local_addr = null, remote_addr = null;
344: int local_port = 0, remote_port = 0;
345:
346: public String toString() {
347: StringBuffer ret = new StringBuffer();
348:
349: ret.append("local_addr="
350: + (local_addr != null ? local_addr : "null"));
351: ret.append(":" + local_port);
352: ret.append(", remote_addr="
353: + (remote_addr != null ? remote_addr : "null"));
354: ret.append(":" + remote_port);
355: return ret.toString();
356: }
357: }
358:
359: public class WanPipeHeader extends Header {
360: public String group_addr = null;
361:
362: public WanPipeHeader() {
363: } // used for externalization
364:
365: public WanPipeHeader(String n) {
366: group_addr = n;
367: }
368:
369: public long size() {
370: return Header.HDR_OVERHEAD;
371: }
372:
373: public String toString() {
374: return "[WanPipe: group_addr=" + group_addr + ']';
375: }
376:
377: public void writeExternal(ObjectOutput out) throws IOException {
378: out.writeObject(group_addr);
379: }
380:
381: public void readExternal(ObjectInput in) throws IOException,
382: ClassNotFoundException {
383: group_addr = (String) in.readObject();
384: }
385:
386: }
387:
388: }
|