001: package pygmy.core;
002:
003: import java.io.*;
004: import java.nio.channels.*;
005: import java.nio.ByteBuffer;
006: import java.net.InetSocketAddress;
007: import java.util.Iterator;
008: import java.util.logging.Logger;
009: import java.util.logging.Level;
010: import java.util.logging.LogRecord;
011:
012: public class SingleThreadedHttpEndPoint implements EndPoint, Runnable {
013: private static Logger log = Logger
014: .getLogger(SingleThreadedHttpEndPoint.class.getName());
015:
016: private static final ConfigOption PORT_OPTION = new ConfigOption(
017: "port", "80", "HTTP server port");
018: private static final ConfigOption BUFFER_SIZE_OPTION = new ConfigOption(
019: "buffersize", "1024", "Read buffer size.");
020:
021: private String endpointName;
022: private Server server;
023: private ByteBuffer byteBuffer;
024: private Thread mainThread;
025: private int socketPort = 80;
026:
027: public void initialize(String name, Server server)
028: throws IOException {
029: this .endpointName = name;
030: this .server = server;
031: try {
032: socketPort = PORT_OPTION.getInteger(server, endpointName)
033: .intValue();
034: } catch (NumberFormatException e) {
035: }
036: int size = 1024;
037: try {
038: size = BUFFER_SIZE_OPTION.getInteger(server, endpointName)
039: .intValue();
040: } catch (NumberFormatException e) {
041: }
042: byteBuffer = ByteBuffer.allocateDirect(size);
043: }
044:
045: public String getName() {
046: return endpointName;
047: }
048:
049: public void start() {
050: mainThread = new Thread(this , endpointName + "[" + socketPort
051: + "] ServerSocketEndPoint");
052: mainThread.setDaemon(true);
053: mainThread.start();
054: }
055:
056: public void run() {
057: Selector selector = null;
058: try {
059: selector = createSelector(socketPort);
060: boolean keepProcessing = true;
061: while (keepProcessing) {
062: keepProcessing = processIncomingConnections(selector);
063: }
064: } catch (IOException e) {
065: logException(Level.SEVERE, e);
066: } finally {
067: if (selector != null) {
068: try {
069: selector.close();
070: } catch (IOException ignore) {
071: }
072: }
073: mainThread = null;
074: }
075: }
076:
077: private boolean processIncomingConnections(Selector selector) {
078: try {
079: selector.select();
080: if (Thread.currentThread().isInterrupted()) {
081: return false;
082: }
083: Iterator it = selector.selectedKeys().iterator();
084: while (it.hasNext()) {
085: SelectionKey key = (SelectionKey) it.next();
086: try {
087: handleKey(selector, key);
088: } catch (IOException ioe) {
089: logException(Level.WARNING, ioe);
090: ((DirectionalTransfer) key.attachment())
091: .closeClient();
092: } finally {
093: it.remove();
094: }
095: }
096: } catch (Exception e) {
097: logException(Level.SEVERE, e);
098: }
099: return true;
100: }
101:
102: private void logException(Level logLevel, Throwable e) {
103: LogRecord record = new LogRecord(logLevel, e.getMessage());
104: record.setThrown(e);
105: log.log(record);
106: }
107:
108: public void shutdown(Server server) {
109: if (mainThread != null) {
110: mainThread.interrupt();
111: }
112: }
113:
114: private Selector createSelector(int port) throws IOException {
115: ServerSocketChannel serverChannel = ServerSocketChannel.open();
116: Selector selector = Selector.open();
117: serverChannel.socket().bind(new InetSocketAddress(port));
118: serverChannel.configureBlocking(false);
119: serverChannel.register(selector, SelectionKey.OP_ACCEPT);
120: return selector;
121: }
122:
123: private void handleKey(Selector selector, SelectionKey key)
124: throws IOException {
125: if (key.isAcceptable()) {
126: acceptNewClient(selector, key);
127: } else if (key.isReadable()) {
128: readDataFromSocket(key);
129: }
130: }
131:
132: private void acceptNewClient(Selector selector, SelectionKey key)
133: throws IOException {
134: ServerSocketChannel serverChannel = (ServerSocketChannel) key
135: .channel();
136: SocketChannel channel = serverChannel.accept();
137: channel.configureBlocking(false);
138: Client client = new Client(channel);
139: client.out.source().register(selector, SelectionKey.OP_READ,
140: client.getTransferToSocket());
141: channel.register(selector, SelectionKey.OP_READ, client
142: .getTransferToWorker());
143: server.post(new NonBlockingRunnable(server, channel.socket(),
144: client.getTransferToWorker(), client
145: .getTransferToSocket()));
146: }
147:
148: private void readDataFromSocket(SelectionKey key)
149: throws IOException {
150: try {
151: int count = ((ReadableByteChannel) key.channel())
152: .read(byteBuffer);
153: if (count > 0) {
154: byteBuffer.flip();
155: DirectionalTransfer direction = (DirectionalTransfer) key
156: .attachment();
157: direction.transfer(byteBuffer);
158: } else if (count < 0) {
159: ((DirectionalTransfer) key.attachment()).closeClient();
160: }
161: } finally {
162: byteBuffer.clear();
163: }
164: }
165:
166: public interface DirectionalTransfer {
167: public void transfer(ByteBuffer data) throws IOException;
168:
169: public void closeClient() throws IOException;
170: }
171:
172: public static class Client {
173: SocketChannel channel;
174: Pipe in;
175: Pipe out;
176: TransferToSocket socketTransfer;
177: TransferToWorker workerTransfer;
178:
179: public Client(SocketChannel aChannel) throws IOException {
180: this .channel = aChannel;
181: in = Pipe.open();
182: out = Pipe.open();
183: in.sink().configureBlocking(false);
184: out.source().configureBlocking(false);
185: socketTransfer = new TransferToSocket(this );
186: workerTransfer = new TransferToWorker(this );
187: }
188:
189: public TransferToSocket getTransferToSocket() {
190: return socketTransfer;
191: }
192:
193: public TransferToWorker getTransferToWorker() {
194: return workerTransfer;
195: }
196: }
197:
198: public static class TransferToWorker extends InputStream implements
199: DirectionalTransfer {
200: Client client;
201:
202: public TransferToWorker(Client client) {
203: this .client = client;
204: }
205:
206: public int read(byte b[]) throws IOException {
207: return read(b, 0, b.length);
208: }
209:
210: public int read(byte b[], int off, int len) throws IOException {
211: ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
212: return client.in.source().read(buffer);
213: }
214:
215: public int read() throws IOException {
216: byte[] byte1 = new byte[1];
217: int count = 0;
218: while (count == 0) {
219: count = read(byte1);
220: }
221:
222: if (count > 0) {
223: return (int) byte1[0];
224: } else {
225: return count;
226: }
227: }
228:
229: public void close() throws IOException {
230: // client.in.sink().close();
231: }
232:
233: public void transfer(ByteBuffer data) throws IOException {
234: int count = client.in.sink().write(data);
235: if (count == 0 || data.hasRemaining()) {
236: System.out.println("Count: " + count + " remaing: "
237: + data.hasRemaining());
238: }
239: }
240:
241: public void closeClient() throws IOException {
242: client.channel.close();
243: client.in.sink().close();
244: client.out.source().close();
245: }
246: }
247:
248: public static class TransferToSocket extends OutputStream implements
249: DirectionalTransfer {
250: Client client;
251: byte[] byte1;
252:
253: public TransferToSocket(Client client) {
254: this .client = client;
255: }
256:
257: public void transfer(ByteBuffer data) throws IOException {
258: int written = client.channel.write(data);
259: if (written == 0) {
260: System.out.println("Written to socket: " + written);
261: }
262: }
263:
264: public void closeClient() throws IOException {
265: client.in.source().close();
266: client.out.sink().close();
267: }
268:
269: public void write(int b) throws IOException {
270: if (byte1 == null) {
271: byte1 = new byte[1];
272: }
273: byte1[0] = (byte) b;
274: client.out.sink().write(ByteBuffer.wrap(byte1));
275: }
276:
277: public void write(byte b[]) throws IOException {
278: write(b, 0, b.length);
279: }
280:
281: public void write(byte b[], int off, int len)
282: throws IOException {
283: ByteBuffer buffer = ByteBuffer.wrap(b, off, len);
284: while (buffer.hasRemaining()) {
285: int written = client.out.sink().write(buffer);
286: if (written == 0) {
287: Thread.yield();
288: }
289: }
290: }
291:
292: public void close() throws IOException {
293: // client.out.sink().close();
294: }
295: }
296: }
|