001: // $Id: TimedWriter.java,v 1.6 2006/01/19 09:53:39 belaban Exp $
002:
003: package org.jgroups.util;
004:
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007:
008: import java.io.DataOutputStream;
009: import java.io.IOException;
010: import java.io.OutputStream;
011: import java.net.InetAddress;
012: import java.net.Socket;
013:
014: /**
015: Waits until the buffer has been written to the output stream, or until timeout msecs have elapsed,
016: whichever comes first.
017: TODO: make it more generic, so all sorts of timed commands should be executable. Including return
018: values, exceptions and Timeout exception. Also use ReusableThread instead of creating a new threa
019: each time.
020:
021: @author Bela Ban
022: */
023:
024: public class TimedWriter {
025: Thread thread = null;
026: long timeout = 2000;
027: boolean completed = true;
028: Exception write_ex = null;
029: Socket sock = null;
030: static Log log = LogFactory.getLog(TimedWriter.class);
031:
032: static class Timeout extends Exception {
033: public String toString() {
034: return "TimedWriter.Timeout";
035: }
036: }
037:
038: class WriterThread extends Thread {
039: DataOutputStream out = null;
040: byte[] buf = null;
041: int i = 0;
042:
043: public WriterThread(OutputStream out, byte[] buf) {
044: super (Util.getGlobalThreadGroup(),
045: "TimedWriter.WriterThread");
046: this .out = new DataOutputStream(out);
047: this .buf = buf;
048: }
049:
050: public WriterThread(OutputStream out, int i) {
051: super (Util.getGlobalThreadGroup(),
052: "TimedWriter.WriterThread");
053: this .out = new DataOutputStream(out);
054: this .i = i;
055: }
056:
057: public void run() {
058: try {
059: if (buf != null)
060: out.write(buf);
061: else {
062: out.writeInt(i);
063: }
064:
065: } catch (IOException e) {
066: write_ex = e;
067: }
068: completed = true;
069: }
070: }
071:
072: class SocketCreator extends Thread {
073: InetAddress local = null, remote = null;
074: int peer_port = 0;
075:
076: public SocketCreator(InetAddress local, InetAddress remote,
077: int peer_port) {
078: this .local = local;
079: this .remote = remote;
080: this .peer_port = peer_port;
081: }
082:
083: public void run() {
084: completed = false;
085: sock = null;
086:
087: try {
088: sock = new Socket(remote, peer_port, local, 0); // 0 means choose any port
089: } catch (IOException io_ex) {
090: write_ex = io_ex;
091: }
092: completed = true;
093: }
094: }
095:
096: void start(InetAddress local, InetAddress remote, int peer_port) {
097: stop();
098: thread = new SocketCreator(local, remote, peer_port);
099: thread.start();
100: }
101:
102: void start(OutputStream out, byte[] buf) {
103: stop();
104: thread = new WriterThread(out, buf);
105: thread.start();
106: }
107:
108: void start(OutputStream out, int i) {
109: stop();
110: thread = new WriterThread(out, i);
111: thread.start();
112: }
113:
114: void stop() {
115: if (thread != null && thread.isAlive()) {
116: thread.interrupt();
117: try {
118: thread.join(timeout);
119: } catch (Exception e) {
120: }
121: }
122: }
123:
124: /**
125: Writes data to an output stream. If the method does not return within timeout milliseconds,
126: a Timeout exception will be thrown.
127: */
128: public synchronized void write(OutputStream out, byte[] buf,
129: long timeout) throws Exception, Timeout,
130: InterruptedException {
131: if (out == null || buf == null) {
132: log
133: .error("TimedWriter.write(): output stream or buffer is null, ignoring write");
134: return;
135: }
136:
137: try {
138: this .timeout = timeout;
139: completed = false;
140: start(out, buf);
141: if (thread == null)
142: return;
143:
144: thread.join(timeout);
145:
146: if (completed == false) {
147: throw new Timeout();
148: }
149: if (write_ex != null) {
150: Exception tmp = write_ex;
151: write_ex = null;
152: throw tmp;
153: }
154: } finally { // stop the thread in any case
155: stop();
156: }
157: }
158:
159: public synchronized void write(OutputStream out, int i, long timeout)
160: throws Exception, Timeout, InterruptedException {
161: if (out == null) {
162: log
163: .error("TimedWriter.write(): output stream is null, ignoring write");
164: return;
165: }
166:
167: try {
168: this .timeout = timeout;
169: completed = false;
170: start(out, i);
171: if (thread == null)
172: return;
173:
174: thread.join(timeout);
175: if (completed == false) {
176: throw new Timeout();
177: }
178: if (write_ex != null) {
179: Exception tmp = write_ex;
180: write_ex = null;
181: throw tmp;
182: }
183: } finally { // stop the thread in any case
184: stop();
185: }
186: }
187:
188: /** Tries to create a socket to remote_peer:remote_port. If not sucessful within timeout
189: milliseconds, throws the Timeout exception. Otherwise, returns the socket or throws an
190: IOException. */
191: public synchronized Socket createSocket(InetAddress local,
192: InetAddress remote, int port, long timeout)
193: throws Exception, Timeout, InterruptedException {
194:
195: try {
196: this .timeout = timeout;
197: completed = false;
198: start(local, remote, port);
199: if (thread == null)
200: return null;
201:
202: thread.join(timeout);
203: if (completed == false) {
204: throw new Timeout();
205: }
206: if (write_ex != null) {
207: Exception tmp = write_ex;
208: write_ex = null;
209: throw tmp;
210: }
211: return sock;
212: } finally { // stop the thread in any case
213: stop();
214: }
215: }
216:
217: public static void main(String[] args) {
218: TimedWriter w = new TimedWriter();
219: InetAddress local = null;
220: InetAddress remote = null;
221: int port = 0;
222: Socket sock = null;
223:
224: if (args.length != 3) {
225: log
226: .error("TimedWriter <local host> <remote host> <remote port>");
227: return;
228: }
229:
230: try {
231: local = InetAddress.getByName(args[0]);
232: remote = InetAddress.getByName(args[1]);
233: port = Integer.parseInt(args[2]);
234: } catch (Exception e) {
235: log.error("Could find host " + remote);
236: return;
237: }
238:
239: while (true) {
240:
241: try {
242: sock = w.createSocket(local, remote, port, 3000);
243: if (sock != null) {
244: System.out.println("Connection created");
245: return;
246: }
247: } catch (TimedWriter.Timeout t) {
248: log.error("Timed out creating socket");
249: } catch (Exception io_ex) {
250: log.error("Connection could not be created, retrying");
251: Util.sleep(2000);
252: }
253: }
254:
255: }
256: }
|