001: package org.xsocket.connection;
002:
003: import java.io.File;
004: import java.io.IOException;
005: import java.io.RandomAccessFile;
006: import java.lang.management.ManagementFactory;
007: import java.net.InetAddress;
008: import java.nio.BufferUnderflowException;
009: import java.nio.ByteBuffer;
010: import java.nio.channels.FileChannel;
011: import java.rmi.registry.LocateRegistry;
012: import java.rmi.registry.Registry;
013: import java.util.HashMap;
014: import java.util.Map;
015: import java.util.concurrent.Executors;
016:
017: import javax.management.MBeanServer;
018: import javax.management.remote.JMXConnectorServer;
019: import javax.management.remote.JMXConnectorServerFactory;
020: import javax.management.remote.JMXServiceURL;
021:
022: import org.xsocket.MaxReadSizeExceededException;
023:
024: public final class BulkDownloadServer {
025:
026: public static final String TEMP_FILE_EXTENSION = "xtmp";
027:
028: public static final String DOWNLOAD_REQUEST = "down";
029: public static final String PREPARE_DOWNLOAD = "prepareDown";
030: public static final String OK = "OK";
031: public static final String DELIMITER = "\r\n";
032:
033: public static void main(String[] args) throws Exception {
034:
035: if (args.length < 2) {
036: System.out
037: .println("usage org.xsocket.stream.BulkDownloadServer <port> <ReadFromFile> <workers>");
038: System.exit(-1);
039: }
040:
041: new BulkDownloadServer().launch(args);
042: }
043:
044: public void launch(String... args) throws Exception {
045:
046: int port = Integer.parseInt(args[0]);
047: boolean readFromFile = Boolean.parseBoolean(args[1]);
048: Server server = null;
049:
050: if (args.length > 2) {
051: int workers = Integer.parseInt(args[2]);
052: server = new Server(InetAddress.getByName("xp-lp-grro2"),
053: port, new Handler(readFromFile));
054: server.setWorkerpool(Executors.newFixedThreadPool(workers));
055:
056: System.out.println("using worker pool with maxSize="
057: + workers + " (readFromFile=" + readFromFile + ")");
058:
059: } else {
060: server = new Server("localhost", port, new Handler(
061: readFromFile));
062: System.out
063: .println("using default worker pool (readFromFile="
064: + readFromFile + ")");
065: }
066:
067: Thread t = new Thread(server);
068: t.setName("xAcceptor");
069:
070: startJmxServer("test", 1899);
071: ConnectionUtils.registerMBean(server);
072:
073: System.out.println("running BulkDownloadServer");
074: t.start();
075:
076: final IServer srv = server;
077: Runtime.getRuntime().addShutdownHook(new Thread() {
078: @Override
079: public void run() {
080: try {
081: srv.close();
082: } catch (Exception e) {
083: e.printStackTrace();
084: }
085:
086: }
087: });
088:
089: while (true) {
090: try {
091: Thread.sleep(5000);
092: } catch (InterruptedException ignore) {
093: }
094: }
095: }
096:
097: private static void startJmxServer(String name, int port) {
098: try {
099: Registry registry = LocateRegistry.createRegistry(port);
100: registry.unbind(name);
101: } catch (Exception ignore) {
102: }
103:
104: try {
105: JMXServiceURL url = new JMXServiceURL(
106: "service:jmx:rmi:///jndi/rmi://"
107: + InetAddress.getLocalHost().getHostName()
108: + ":" + port + "/" + name);
109:
110: MBeanServer mbeanSrv = ManagementFactory
111: .getPlatformMBeanServer();
112: JMXConnectorServer server = JMXConnectorServerFactory
113: .newJMXConnectorServer(url, null, mbeanSrv);
114: server.start();
115: System.out
116: .println("JMX RMI Agent has been bound on address");
117: System.out.println(url);
118: } catch (Exception e) {
119: e.printStackTrace();
120: }
121: }
122:
123: private static final class Handler implements IDataHandler {
124:
125: private boolean readFromFile = true;
126:
127: public Handler(boolean readFromFile) {
128: this .readFromFile = readFromFile;
129: }
130:
131: public boolean getReadFromFile() {
132: return readFromFile;
133: }
134:
135: public void setReadFromFile(boolean b) {
136: this .readFromFile = b;
137: }
138:
139: @Override
140: public boolean onData(INonBlockingConnection connection)
141: throws IOException, BufferUnderflowException,
142: MaxReadSizeExceededException {
143:
144: String cmd = connection.readStringByDelimiter(DELIMITER);
145:
146: if (cmd.startsWith(PREPARE_DOWNLOAD)) {
147: Integer size = Integer.parseInt(cmd.substring(
148: PREPARE_DOWNLOAD.length(), cmd.length()));
149:
150: if (readFromFile) {
151: File file = File.createTempFile("test",
152: TEMP_FILE_EXTENSION);
153: file.deleteOnExit();
154:
155: FileChannel fc = new RandomAccessFile(file, "rw")
156: .getChannel();
157: fc.write(QAUtil.generateDirectByteBuffer(size));
158: fc.close();
159:
160: connection.setAttachment(file);
161: } else {
162: connection.setAttachment(size);
163: }
164: connection.write(OK + DELIMITER);
165:
166: } else if (cmd.startsWith(DOWNLOAD_REQUEST)) {
167: if (readFromFile) {
168: try {
169: File file = (File) connection.getAttachment();
170: FileChannel fc = new RandomAccessFile(file, "r")
171: .getChannel();
172: connection.transferFrom(fc);
173: fc.close();
174: connection.write(DELIMITER);
175:
176: } catch (Exception e) {
177: e.printStackTrace();
178: }
179:
180: } else {
181: Integer size = (Integer) connection.getAttachment();
182: connection.write(QAUtil
183: .generateDirectByteBuffer(size));
184: connection.write(DELIMITER);
185: }
186: }
187:
188: return true;
189: }
190: }
191: }
|