001: package vicazh.hyperpool.stream.net;
002:
003: import java.io.*;
004: import java.net.*;
005: import java.util.logging.*;
006: import vicazh.hyperpool.*;
007: import vicazh.hyperpool.Start;
008: import vicazh.hyperpool.stream.*;
009: import vicazh.hyperpool.stream.Element;
010: import vicazh.hyperpool.stream.Connection;
011:
012: /**
013: * The UDP in service
014: *
015: * @author Victor Zhigunov
016: * @version 0.4.0
017: */
018: public class UDPInService extends Service implements Runnable,
019: UDPInServiceMBean {
020: public UDPInService() {
021: }
022:
023: private String msg;
024:
025: private int size;
026:
027: /**
028: * @param msg
029: * port use message
030: * @param size
031: * packet size
032: */
033: public UDPInService(String msg, int size) {
034: this .msg = msg;
035: this .size = size;
036: }
037:
038: public DatagramSocket socket;
039:
040: private int port;
041:
042: public void setPort(int port) {
043: this .port = port;
044: if (msg != null) {
045: if (socket != null) {
046: socket.close();
047: socket = null;
048: }
049: try {
050: socket = new DatagramSocket(port);
051: new Thread(this ).start();
052: } catch (SocketException e) {
053: System.out.println(msg);
054: }
055: }
056: }
057:
058: public int getPort() {
059: return port;
060: }
061:
062: public void run() {
063: try {
064: while (!socket.isClosed()) {
065: byte[] buf = new byte[size];
066: DatagramPacket packet = new DatagramPacket(buf,
067: buf.length);
068: socket.receive(packet);
069: new Trans(packet).start();
070: }
071: } catch (Exception e) {
072: }
073: }
074:
075: private class Trans extends Thread {
076: private DatagramPacket packet;
077:
078: public Trans(DatagramPacket packet) {
079: this .packet = packet;
080: }
081:
082: public void run() {
083: try {
084: InetAddress a = packet.getAddress();
085: int p = packet.getPort();
086: UDPInConnection connection = null;
087: for (Connection c : getConnections())
088: if (((UDPInConnection) c).address.equals(a)
089: && ((UDPInConnection) c).port == p) {
090: connection = (UDPInConnection) c;
091: break;
092: }
093: if (connection == null) {
094: connection = new UDPInConnection(UDPInService.this ,
095: a, p);
096: connection.setServer(new ByteArrayOutputStream());
097: connection.setClient(((Element) getElement())
098: .get(connection.getServer()));
099: add(connection);
100: }
101: synchronized (connection) {
102: Transfer.run(packet, connection.getClient());
103: }
104: } catch (BreakException e) {
105: } catch (Exception e) {
106: Start.logger.log(Level.SEVERE, e.getMessage(), e);
107: }
108: }
109: }
110:
111: public void stop() throws Exception {
112: try {
113: socket.close();
114: } catch (Exception e) {
115: }
116: super .stop();
117: }
118:
119: public void setAttribute(String name, Object value)
120: throws Exception {
121: if (name.equals(ElementMBean.INIT))
122: super .setAttribute(name, InetAddress.getLocalHost()
123: .getHostAddress());
124: else {
125: setPort((Integer) value);
126: super .setAttribute(name, value);
127: if (socket == null)
128: throw (new SocketException());
129: }
130: }
131: }
|