001: /*
002: * Copyright (c) xsocket.org, 2006 - 2008. All rights reserved.
003: *
004: * This library is free software; you can redistribute it and/or
005: * modify it under the terms of the GNU Lesser General Public
006: * License as published by the Free Software Foundation; either
007: * version 2.1 of the License, or (at your option) any later version.
008: *
009: * This library is distributed in the hope that it will be useful,
010: * but WITHOUT ANY WARRANTY; without even the implied warranty of
011: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
012: * Lesser General Public License for more details.
013: *
014: * You should have received a copy of the GNU Lesser General Public
015: * License along with this library; if not, write to the Free Software
016: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
017: *
018: * Please refer to the LGPL license at: http://www.gnu.org/copyleft/lesser.txt
019: * The latest copy of this software may be found on http://www.xsocket.org/
020: */
021: package org.xsocket.connection.http;
022:
023: import java.io.IOException;
024: import java.nio.BufferUnderflowException;
025: import java.nio.ByteBuffer;
026: import java.nio.channels.WritableByteChannel;
027: import java.util.HashMap;
028: import java.util.Map;
029: import java.util.Map.Entry;
030:
031: import org.junit.Assert;
032: import org.junit.Test;
033:
034: import org.xsocket.DataConverter;
035: import org.xsocket.Execution;
036: import org.xsocket.connection.IServer;
037: import org.xsocket.connection.ConnectionUtils;
038: import org.xsocket.connection.http.BodyDataSink;
039: import org.xsocket.connection.http.HttpRequest;
040: import org.xsocket.connection.http.HttpRequestHeader;
041: import org.xsocket.connection.http.HttpResponse;
042: import org.xsocket.connection.http.HttpResponseHeader;
043: import org.xsocket.connection.http.client.HttpClientConnection;
044: import org.xsocket.connection.http.client.IHttpResponseHandler;
045: import org.xsocket.connection.http.server.HttpServer;
046: import org.xsocket.connection.http.server.HttpServerConnection;
047: import org.xsocket.connection.http.server.IHttpResponseContext;
048: import org.xsocket.connection.http.server.IHttpRequestHandler;
049:
050: /**
051: *
052: * @author grro@xsocket.org
053: */
054: public final class MulticastTest {
055:
056: @Test
057: public void testSimple() throws Exception {
058: MulticastService multicastService = new MulticastService();
059:
060: IServer server = new HttpServer(new MulticastServerHandler(
061: multicastService));
062: server.setIdleTimeoutMillis(3 * 60 * 1000);
063: ConnectionUtils.start(server);
064:
065: HttpClientConnection con1 = new HttpClientConnection(
066: "localhost", server.getLocalPort());
067: HttpRequestHeader header1 = new HttpRequestHeader("POST",
068: "/mutlicast");
069: MulticastClientHandler receiveChannel1 = new MulticastClientHandler();
070: BodyDataSink sendChannel1 = con1.send(header1, receiveChannel1);
071: sendChannel1.setAutoflush(false);
072: sendChannel1.flush();
073:
074: QAUtil.sleep(200);
075: Assert.assertEquals(1, multicastService.getCountPeers());
076:
077: send(sendChannel1, "1");
078: sendChannel1.flush();
079:
080: int length = receiveChannel1.readInt();
081: String data = receiveChannel1.readStringByLength(length);
082: Assert.assertEquals("1", data);
083:
084: HttpClientConnection con2 = new HttpClientConnection(
085: "localhost", server.getLocalPort());
086: HttpRequestHeader header2 = new HttpRequestHeader("POST",
087: "/mutlicast");
088: MulticastClientHandler receiveChannel2 = new MulticastClientHandler();
089: BodyDataSink sendChannel2 = con2.send(header2, receiveChannel2);
090: sendChannel2.setAutoflush(false);
091: sendChannel2.flush();
092:
093: QAUtil.sleep(200);
094: Assert.assertEquals(2, multicastService.getCountPeers());
095:
096: send(sendChannel1, "2");
097: sendChannel1.flush();
098:
099: length = receiveChannel1.readInt();
100: data = receiveChannel1.readStringByLength(length);
101: Assert.assertEquals("2", data);
102:
103: length = receiveChannel2.readInt();
104: data = receiveChannel2.readStringByLength(length);
105: Assert.assertEquals("2", data);
106:
107: con1.close();
108:
109: QAUtil.sleep(200);
110: Assert.assertEquals(1, multicastService.getCountPeers());
111:
112: send(sendChannel2, "3");
113: length = receiveChannel2.readInt();
114: data = receiveChannel2.readStringByLength(length);
115: Assert.assertEquals("3", data);
116:
117: con2.close();
118: server.close();
119: }
120:
121: private void send(BodyDataSink dataSink, String msg)
122: throws IOException {
123: dataSink.markWritePosition();
124: dataSink.write((int) 0); // write a empty length field
125:
126: int written = dataSink.write(msg); // write the data
127:
128: dataSink.resetToWriteMark();
129: dataSink.write(written); // write the length field
130: dataSink.flush();
131: }
132:
133: private static final class MulticastClientHandler implements
134: IHttpResponseHandler {
135:
136: private BlockingBodyDataSource channel = null;
137:
138: private final Object guard = new Object();
139:
140: public void onResponse(HttpResponse response)
141: throws IOException {
142: synchronized (guard) {
143: channel = response.getBlockingBody();
144: guard.notifyAll();
145: }
146: }
147:
148: public int readInt() throws IOException {
149:
150: synchronized (guard) {
151:
152: do {
153: if (channel == null) {
154: try {
155: guard.wait();
156: } catch (InterruptedException ignore) {
157: }
158:
159: } else {
160: return channel.readInt();
161: }
162: } while (channel != null);
163: }
164:
165: throw new IOException();
166: }
167:
168: public String readStringByLength(int length) throws IOException {
169:
170: synchronized (guard) {
171:
172: do {
173: if (channel == null) {
174: try {
175: guard.wait();
176: } catch (InterruptedException ignore) {
177: }
178:
179: } else {
180: return channel.readStringByLength(length);
181: }
182: } while (channel != null);
183: }
184:
185: throw new IOException();
186: }
187: }
188:
189: private static final class MulticastServerHandler implements
190: IHttpRequestHandler, IHttpDisconnectHandler {
191:
192: private MulticastService multicastService = null;
193:
194: public MulticastServerHandler(MulticastService multicastService) {
195: this .multicastService = multicastService;
196: }
197:
198: @Execution(Execution.NONTHREADED)
199: public void onRequest(HttpRequest request,
200: final IHttpResponseContext httpServerEndpoint)
201: throws IOException {
202:
203: // retrieve request bpdy
204: NonBlockingBodyDataSource requestBody = request
205: .getNonBlockingBody();
206:
207: // prepare and send response
208: HttpResponseHeader responseHeader = new HttpResponseHeader(
209: 200);
210: BodyDataSink bodyDataSink = httpServerEndpoint
211: .send(responseHeader);
212:
213: multicastService.registerPeer(httpServerEndpoint.getId(),
214: bodyDataSink);
215:
216: IBodyDataHandler bodyHandler = new IBodyDataHandler() {
217:
218: public boolean onData(
219: NonBlockingBodyDataSource bodyDataSource)
220: throws BufferUnderflowException {
221:
222: try {
223: // each message start with a packet size field
224: HttpUtils
225: .validateSufficientDatasizeByIntLengthField(
226: bodyDataSource, false);
227:
228: ByteBuffer[] availableData = bodyDataSource
229: .readByteBufferByLength(bodyDataSource
230: .available());
231:
232: multicastService.sendToPeers(availableData);
233: return true;
234:
235: } catch (IOException ioe) {
236: multicastService
237: .deregisterPeer(httpServerEndpoint
238: .getId());
239: }
240:
241: return true;
242: }
243: };
244: requestBody.setDataHandler(bodyHandler);
245: }
246:
247: @Execution(Execution.NONTHREADED)
248: public boolean onDisconnect(IHttpConnection httpConnection)
249: throws IOException {
250: multicastService.deregisterPeer(httpConnection.getId());
251: return true;
252: }
253: }
254:
255: private static final class MulticastService {
256:
257: private final Map<String, WritableByteChannel> peers = new HashMap<String, WritableByteChannel>();
258:
259: synchronized int getCountPeers() {
260: return peers.size();
261: }
262:
263: synchronized void registerPeer(String id,
264: WritableByteChannel channel) {
265: peers.put(id, channel);
266: System.out.println("peer " + id + " registered");
267: }
268:
269: synchronized void deregisterPeer(String id) {
270: peers.remove(id);
271: System.out.println("peer " + id + " deregistered");
272: }
273:
274: synchronized void sendToPeers(ByteBuffer[] data) {
275:
276: for (Entry<String, WritableByteChannel> peer : peers
277: .entrySet()) {
278:
279: try {
280: for (ByteBuffer buf : data) {
281: peer.getValue().write(buf);
282: buf.flip();
283: }
284: } catch (IOException ioe) {
285: System.out
286: .println("error occured by sending data to"
287: + peer.getKey() + " "
288: + ioe.toString());
289: deregisterPeer(peer.getKey());
290: }
291: }
292: }
293: }
294:
295: }
|