001: // $Id: LogicalLink.java,v 1.5 2005/05/30 16:14:34 belaban Exp $
002:
003: package org.jgroups.blocks;
004:
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007:
008: import java.io.BufferedReader;
009: import java.io.InputStreamReader;
010: import java.net.InetAddress;
011: import java.util.Vector;
012:
013: /**
014: * Implements a logical point-to-point link between 2 entities consisting of a number of physical links.
015: * Traffic is routed over any of the physical link, according to policies. Examples are: send traffic
016: * over all links, round-robin, use first link for 70% of traffic, other links for the remaining 30%.
017: *
018: * @author Bela Ban, June 2000
019: */
020: public class LogicalLink implements Link.Receiver {
021: Receiver receiver = null;
022: final Vector links = new Vector(); // of Links
023: final int link_to_use = 0;
024: Log log = LogFactory.getLog(getClass());
025:
026: public class NoLinksAvailable extends Exception {
027: public String toString() {
028: return "LogicalLinks.NoLinksAvailable: there are no physical links available";
029: }
030: }
031:
032: public class AllLinksDown extends Exception {
033: public String toString() {
034: return "LogicalLinks.AllLinksDown: all physical links are currently down";
035: }
036: }
037:
038: public interface Receiver {
039: void receive(byte[] buf);
040:
041: void linkDown(InetAddress local, int local_port,
042: InetAddress remote, int remote_port);
043:
044: void linkUp(InetAddress local, int local_port,
045: InetAddress remote, int remote_port);
046:
047: void missedHeartbeat(InetAddress local, int local_port,
048: InetAddress remote, int remote_port, int num_hbs);
049:
050: void receivedHeartbeatAgain(InetAddress local, int local_port,
051: InetAddress remote, int remote_port);
052: }
053:
054: public LogicalLink(Receiver r) {
055: receiver = r;
056:
057: }
058:
059: public LogicalLink() {
060:
061: }
062:
063: public void addLink(String local_addr, int local_port,
064: String remote_addr, int remote_port) {
065: Link new_link = new Link(local_addr, local_port, remote_addr,
066: remote_port, this );
067: if (links.contains(new_link))
068: log.error("LogicalLink.add(): link " + new_link
069: + " is already present");
070: else
071: links.addElement(new_link);
072: }
073:
074: public void addLink(String local_addr, int local_port,
075: String remote_addr, int remote_port, long timeout,
076: long hb_interval) {
077: Link new_link = new Link(local_addr, local_port, remote_addr,
078: remote_port, timeout, hb_interval, this );
079: if (links.contains(new_link))
080: log.error("LogicalLink.add(): link " + new_link
081: + " is already present");
082: else
083: links.addElement(new_link);
084: }
085:
086: public void removeAllLinks() {
087: Link tmp;
088: for (int i = 0; i < links.size(); i++) {
089: tmp = (Link) links.elementAt(i);
090: tmp.stop();
091: }
092: links.removeAllElements();
093: }
094:
095: public Vector getLinks() {
096: return links;
097: }
098:
099: public int numberOfLinks() {
100: return links.size();
101: }
102:
103: public int numberOfEstablishedLinks() {
104: int n = 0;
105:
106: for (int i = 0; i < links.size(); i++) {
107: if (((Link) links.elementAt(i)).established())
108: n++;
109: }
110: return n;
111: }
112:
113: /**
114: * Start all links
115: */
116: public void start() {
117: Link tmp;
118: for (int i = 0; i < links.size(); i++) {
119: tmp = (Link) links.elementAt(i);
120: try {
121: tmp.start();
122: } catch (Exception ex) {
123: log
124: .error("LogicalLink.start(): could not create physical link, reason: "
125: + ex);
126: }
127: }
128: }
129:
130: /**
131: * Stop all links
132: */
133: public void stop() {
134: Link tmp;
135: for (int i = 0; i < links.size(); i++) {
136: tmp = (Link) links.elementAt(i);
137: tmp.stop();
138: }
139: }
140:
141: /**
142: * Send a message to the other side
143: */
144: public boolean send(byte[] buf) throws AllLinksDown,
145: NoLinksAvailable {
146: Link link;
147: int link_used = 0;
148:
149: if (buf == null || buf.length == 0) {
150: log.error("LogicalLink.send(): buf is null or empty");
151: return false;
152: }
153:
154: if (links.size() == 0)
155: throw new NoLinksAvailable();
156:
157: // current policy (make policies configurable later !): alternate between links.
158: // if failure, take first link that works
159: // link=(Link)links.elementAt(link_to_use);
160: // if(link.send(buf)) {
161: // System.out.println("Send over link #" + link_to_use + ": " + link);
162: // link_to_use=(link_to_use + 1) % links.size();
163: // return true;
164: // }
165:
166: // link_used=(link_to_use + 1) % links.size();
167: // while(link_used != link_to_use) {
168: // link=(Link)links.elementAt(link_used);
169: // if(link.send(buf)) {
170: // System.out.println("Send over link #" + link_used + ": " + link);
171: // link_to_use=(link_to_use + 1) % links.size();
172: // return true;
173: // }
174: // link_used=(link_used + 1) % links.size();
175: // }
176:
177: // take first available link. use other links only if first is down. if we have smaller and bigger
178: // pipes, the bigger ones should be specified first (so we're using them first, and only when they
179: // are not available we use the smaller ones)
180: for (int i = 0; i < links.size(); i++) {
181: link = (Link) links.elementAt(i);
182: if (link.established()) {
183: if (link.send(buf)) {
184: System.out.println("Send over link #" + link_used
185: + ": " + link);
186: return true;
187: }
188: }
189: }
190:
191: throw new AllLinksDown();
192: }
193:
194: public void setReceiver(Receiver r) {
195: receiver = r;
196: }
197:
198: /*-------- Interface Link.Receiver ---------*/
199:
200: /**
201: * Receive a message from any of the physical links. That's why this and the next methods have to be
202: * synchronized
203: */
204: public synchronized void receive(byte[] buf) {
205: if (receiver != null)
206: receiver.receive(buf);
207: }
208:
209: /**
210: * One of the physical links went down
211: */
212: public synchronized void linkDown(InetAddress local,
213: int local_port, InetAddress remote, int remote_port) {
214: if (receiver != null)
215: receiver.linkDown(local, local_port, remote, remote_port);
216: }
217:
218: /**
219: * One of the physical links came up
220: */
221: public synchronized void linkUp(InetAddress local, int local_port,
222: InetAddress remote, int remote_port) {
223: if (receiver != null)
224: receiver.linkUp(local, local_port, remote, remote_port);
225: }
226:
227: /**
228: * Missed one or more heartbeats. Link is not yet down, though
229: */
230: public synchronized void missedHeartbeat(InetAddress local,
231: int local_port, InetAddress remote, int remote_port,
232: int num_missed_hbs) {
233: if (receiver != null)
234: receiver.missedHeartbeat(local, local_port, remote,
235: remote_port, num_missed_hbs);
236: }
237:
238: /**
239: * Heartbeat came back again (before link was taken down) after missing some heartbeats
240: */
241: public synchronized void receivedHeartbeatAgain(InetAddress local,
242: int local_port, InetAddress remote, int remote_port) {
243: if (receiver != null)
244: receiver.receivedHeartbeatAgain(local, local_port, remote,
245: remote_port);
246: }
247:
248: private static class MyReceiver implements LogicalLink.Receiver {
249:
250: public void receive(byte[] buf) {
251: System.out.println("<-- " + new String(buf));
252: }
253:
254: /**
255: * All of the physical links are down --> logical link is down too
256: */
257: public synchronized void linkDown(InetAddress l, int lp,
258: InetAddress r, int rp) {
259: System.out.println("** linkDown(): " + r + ':' + rp);
260: }
261:
262: /**
263: * At least 1 physical links is up again
264: */
265: public synchronized void linkUp(InetAddress l, int lp,
266: InetAddress r, int rp) {
267: System.out.println("** linkUp(): " + r + ':' + rp);
268: }
269:
270: public synchronized void missedHeartbeat(InetAddress l, int lp,
271: InetAddress r, int rp, int num) {
272: //System.out.println("missedHeartbeat(): " + r + ":" + rp);
273: }
274:
275: public synchronized void receivedHeartbeatAgain(InetAddress l,
276: int lp, InetAddress r, int rp) {
277: //System.out.println("receivedHeartbeatAgain(): " + r + ":" + rp);
278: }
279:
280: }
281:
282: public static void main(String[] args) {
283: LogicalLink ll = new LogicalLink();
284: String local_host, remote_host;
285: int local_port, remote_port;
286: int i = 0;
287:
288: ll.setReceiver(new MyReceiver());
289:
290: if (args.length % 4 != 0 || args.length == 0) {
291: System.err
292: .println("\nLogicalLink <link+>\nwhere <link> is "
293: + "<local host> <local port> <remote host> <remote port>\n");
294: return;
295: }
296:
297: while (i < args.length) {
298: local_host = args[i++];
299: local_port = Integer.parseInt(args[i++]);
300: remote_host = args[i++];
301: remote_port = Integer.parseInt(args[i++]);
302: ll
303: .addLink(local_host, local_port, remote_host,
304: remote_port);
305: }
306:
307: try {
308: ll.start();
309: } catch (Exception e) {
310: System.err.println("LogicalLink.main(): " + e);
311: }
312:
313: BufferedReader in = new BufferedReader(new InputStreamReader(
314: System.in));
315: while (true) {
316: try {
317: System.out.print("> ");
318: System.out.flush();
319: String line = in.readLine();
320: ll.send(line.getBytes());
321: } catch (Exception e) {
322: System.err.println(e);
323: }
324: }
325:
326: }
327:
328: }
|