001: /**
002: * EasyBeans
003: * Copyright (C) 2006 Bull S.A.S.
004: * Contact: easybeans@ow2.org
005: *
006: * This library is free software; you can redistribute it and/or
007: * modify it under the terms of the GNU Lesser General Public
008: * License as published by the Free Software Foundation; either
009: * version 2.1 of the License, or any later version.
010: *
011: * This library is distributed in the hope that it will be useful,
012: * but WITHOUT ANY WARRANTY; without even the implied warranty of
013: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
014: * Lesser General Public License for more details.
015: *
016: * You should have received a copy of the GNU Lesser General Public
017: * License along with this library; if not, write to the Free Software
018: * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
019: * USA
020: *
021: * --------------------------------------------------------------------------
022: * $Id: SmartClientEndPointComponent.java 1970 2007-10-16 11:49:25Z benoitf $
023: * --------------------------------------------------------------------------
024: */package org.ow2.easybeans.component.smartclient.server;
025:
026: import static org.ow2.easybeans.component.smartclient.api.ProtocolConstants.PROTOCOL_VERSION;
027:
028: import java.io.IOException;
029: import java.io.InputStream;
030: import java.net.URL;
031: import java.nio.ByteBuffer;
032: import java.nio.channels.ClosedChannelException;
033: import java.nio.channels.SelectionKey;
034: import java.nio.channels.Selector;
035: import java.nio.channels.ServerSocketChannel;
036: import java.nio.channels.SocketChannel;
037: import java.util.Iterator;
038: import java.util.Set;
039: import java.util.logging.Level;
040: import java.util.logging.Logger;
041:
042: import org.ow2.easybeans.component.api.EZBComponent;
043: import org.ow2.easybeans.component.api.EZBComponentException;
044: import org.ow2.easybeans.component.itf.RegistryComponent;
045: import org.ow2.easybeans.component.smartclient.api.Message;
046: import org.ow2.easybeans.component.smartclient.api.ProtocolConstants;
047: import org.ow2.easybeans.component.smartclient.message.ChannelAttachment;
048: import org.ow2.easybeans.component.smartclient.message.ClassAnswer;
049: import org.ow2.easybeans.component.smartclient.message.ClassNotFound;
050: import org.ow2.easybeans.component.smartclient.message.ClassRequest;
051: import org.ow2.easybeans.component.smartclient.message.ProviderURLAnswer;
052: import org.ow2.easybeans.component.smartclient.message.ResourceAnswer;
053: import org.ow2.easybeans.component.smartclient.message.ResourceNotFound;
054: import org.ow2.easybeans.component.smartclient.message.ResourceRequest;
055:
056: /**
057: * This endpoint receives the request from clients, handle them and send an
058: * answer.<br>
059: * For example, it send the bytecode for a given class.
060: * @author Florent Benoit
061: */
062: public class SmartClientEndPointComponent implements EZBComponent,
063: Runnable {
064:
065: /**
066: * Use the JDK logger (to avoid any dependency).
067: */
068: private static Logger logger = Logger
069: .getLogger(SmartClientEndPointComponent.class.getName());
070:
071: /**
072: * Maximum length of messages that we accept.
073: */
074: private static final int MAX_LENGTH_INCOMING_MSG = 500;
075:
076: /**
077: * Default port number.
078: */
079: private static final int DEFAULT_PORT_NUMBER = 2503;
080:
081: /**
082: * Buffer length.
083: */
084: private static final int BUF_APPEND = 1000;
085:
086: /**
087: * Listening port number.
088: */
089: private int portNumber = DEFAULT_PORT_NUMBER;
090:
091: /**
092: * Nio Selector.
093: */
094: private Selector selector = null;
095:
096: /**
097: * The selection key of the server (accepting clients).
098: */
099: private SelectionKey serverkey = null;
100:
101: /**
102: * Server socket channel (listening).
103: */
104: private ServerSocketChannel server = null;
105:
106: /**
107: * Waiting ?
108: */
109: private boolean waitingSelector = true;
110:
111: /**
112: * Link to the RMI component used to get the provider URL.
113: */
114: private RegistryComponent registryComponent = null;
115:
116: /**
117: * Init method.<br/> This method is called before the start method.
118: * @throws EZBComponentException if the initialization has failed.
119: */
120: public void init() throws EZBComponentException {
121:
122: // Creates a new selector
123: try {
124: selector = Selector.open();
125: } catch (IOException e) {
126: throw new EZBComponentException(
127: "Cannot open a new selector.", e);
128: }
129: // Server socket
130: try {
131: server = ServerSocketChannel.open();
132: } catch (IOException e) {
133: throw new EZBComponentException(
134: "Cannot open a new server socket channel.", e);
135: }
136:
137: // no blocking
138: try {
139: server.configureBlocking(false);
140: } catch (IOException e) {
141: throw new EZBComponentException(
142: "Cannot configure the server socket with non-blocking mode.",
143: e);
144: }
145: }
146:
147: /**
148: * Start method.<br/> This method is called after the init method.
149: * @throws EZBComponentException if the start has failed.
150: */
151: public void start() throws EZBComponentException {
152:
153: // port number listener
154: try {
155: server.socket().bind(
156: new java.net.InetSocketAddress(portNumber));
157: } catch (IOException e) {
158: throw new EZBComponentException(
159: "Cannot listen on the port number '" + portNumber
160: + "', maybe the port is already used.", e);
161: }
162:
163: // registering
164: try {
165: serverkey = server.register(selector,
166: SelectionKey.OP_ACCEPT);
167: } catch (ClosedChannelException e) {
168: throw new EZBComponentException(
169: "Cannot register the current selector as an accepting selector waiting clients.",
170: e);
171: }
172:
173: // now wait clients
174: waitingSelector = true;
175:
176: // infinite loop
177: new Thread(this ).start();
178:
179: logger.info("SmartClient Endpoint listening on port '"
180: + portNumber + "'.");
181:
182: }
183:
184: /**
185: * Stop method.<br/> This method is called when component needs to be
186: * stopped.
187: * @throws EZBComponentException if the stop is failing.
188: */
189: public void stop() throws EZBComponentException {
190: // break infinite loop
191: waitingSelector = false;
192: selector.wakeup();
193: }
194:
195: /**
196: * Infinite loop (until the end of the component) that handle the selectors.
197: */
198: public void handleSelectors() {
199:
200: // infinite loop
201: while (waitingSelector) {
202:
203: // wait new stuff
204: int updatedKeys = 0;
205: try {
206: updatedKeys = selector.select();
207: } catch (IOException e) {
208: logger.log(Level.SEVERE,
209: "Selector has been closed, stopping listener",
210: e);
211: waitingSelector = false;
212: }
213:
214: // No update, then go ahead
215: if (updatedKeys == 0) {
216: continue;
217: }
218:
219: // Get selected keys
220: Set<SelectionKey> selectedKeys = selector.selectedKeys();
221:
222: for (Iterator<SelectionKey> itSelectedKeys = selectedKeys
223: .iterator(); itSelectedKeys.hasNext();) {
224: SelectionKey selectionKey = itSelectedKeys.next();
225: itSelectedKeys.remove(); // remove it has it was handled
226:
227: // Server key ?
228: if (selectionKey == serverkey) {
229: // New client ?
230: if (selectionKey.isAcceptable()) {
231: try {
232: handleAccept();
233: } catch (Exception e) {
234: // all exception (including runtime)
235: logger
236: .log(
237: Level.SEVERE,
238: "Unable to accept a new connection.",
239: e);
240: }
241: }
242: } else if (selectionKey.isReadable()) {
243: // get request from client
244: try {
245: handleRead(selectionKey);
246: } catch (Exception e) {
247: // all exception (including runtime)
248: logger.log(Level.SEVERE,
249: "Unable to read data from the client.",
250: e);
251: }
252: } else if (selectionKey.isWritable()) {
253: // answer to the client
254: try {
255: handleWrite(selectionKey);
256: } catch (Exception e) {
257: // all exception (including runtime)
258: logger.log(Level.SEVERE,
259: "Unable to write data to the client.",
260: e);
261: }
262: }
263: }
264: }
265: }
266:
267: /**
268: * Handle a new client that is being connected.
269: * @throws IOException if cannot accept the client
270: */
271: private void handleAccept() throws IOException {
272: // new incoming connection
273: SocketChannel client = server.accept();
274:
275: // Non blocking client
276: client.configureBlocking(false);
277:
278: // Register client (with an empty channel attachment)
279: client.register(selector, SelectionKey.OP_READ,
280: new ChannelAttachment());
281: }
282:
283: /**
284: * Handle all read operations on channels.
285: * @param selectionKey the selected key.
286: * @throws IOException if cannot read from the channel.
287: */
288: private void handleRead(final SelectionKey selectionKey)
289: throws IOException {
290: // Get the client channel that has data to read
291: SocketChannel client = (SocketChannel) selectionKey.channel();
292:
293: // current bytecode read
294: ChannelAttachment channAttachment = (ChannelAttachment) selectionKey
295: .attachment();
296: ByteBuffer channBuffer = channAttachment.getByteBuffer();
297:
298: // Read again
299: int bytesread = client.read(channBuffer);
300: if (bytesread == -1) {
301: // close (as the client has been disconnected)
302: selectionKey.cancel();
303: client.close();
304: }
305:
306: // Client send data, analyze data
307:
308: // Got header ?
309: if (channBuffer.position() >= Message.HEADER_SIZE) {
310:
311: // Yes, got header
312: // Check if it is a protocol that we manage
313: byte version = channBuffer.get(0);
314: if (version != PROTOCOL_VERSION) {
315: selectionKey.cancel();
316: client.close();
317: throw new IllegalStateException(
318: "Invalid protocol version : waiting '"
319: + PROTOCOL_VERSION + "', got '"
320: + version + "'.");
321: }
322:
323: // Get operation asked by client
324: byte opCode = channBuffer.get(1);
325:
326: // Length
327: int length = channBuffer.getInt(2);
328: if (length < 0) {
329: selectionKey.cancel();
330: client.close();
331: throw new IllegalStateException(
332: "Invalid length for client '" + length + "'.");
333: }
334:
335: if (length > MAX_LENGTH_INCOMING_MSG) {
336: selectionKey.cancel();
337: client.close();
338: throw new IllegalStateException(
339: "Length too big, max length = '"
340: + MAX_LENGTH_INCOMING_MSG
341: + "', current = '" + length + "'.");
342: }
343:
344: // Correct header and correct length ?
345: if (channBuffer.position() >= Message.HEADER_SIZE + length) {
346: // set the limit (specified in the length), else we have a
347: // default buffer limit
348: channBuffer.limit(Message.HEADER_SIZE + length);
349:
350: // duplicate this buffer
351: ByteBuffer dataBuffer = channBuffer.duplicate();
352:
353: // skip header (already analyzed)
354: dataBuffer.position(Message.HEADER_SIZE);
355:
356: // Switch on operations :
357: try {
358: switch (opCode) {
359: case ProtocolConstants.CLASS_REQUEST:
360: handleReadClassRequest(selectionKey, dataBuffer);
361: break;
362: case ProtocolConstants.RESOURCE_REQUEST:
363: handleReadResourceRequest(selectionKey,
364: dataBuffer);
365: break;
366: case ProtocolConstants.PROVIDER_URL_REQUEST:
367: handleReadProviderURLRequest(selectionKey,
368: dataBuffer);
369: break;
370: default:
371: // nothing to do
372: }
373: } catch (Exception e) {
374: // clean
375: selectionKey.cancel();
376: client.close();
377: throw new IllegalStateException(
378: "Cannot handle request with opCode '"
379: + opCode + "'.", e);
380: }
381: }
382: }
383:
384: }
385:
386: /**
387: * Handle the client's request asking for a class.
388: * @param selectionKey key for exchanging with the client.
389: * @param dataBuffer the buffer that contains request.
390: * @throws IOException if operation fails
391: */
392: private void handleReadClassRequest(
393: final SelectionKey selectionKey, final ByteBuffer dataBuffer)
394: throws IOException {
395: // Build object (from input)
396: ClassRequest classRequest = new ClassRequest(dataBuffer);
397: String className = classRequest.getClassName();
398:
399: // Answer to the client (go in write mode)
400: selectionKey.interestOps(SelectionKey.OP_WRITE);
401: String encodedClassName = className.replaceAll("\\.", "/")
402: .concat(".class");
403:
404: // Find the resource from the classloader
405: InputStream inputStream = Thread.currentThread()
406: .getContextClassLoader().getResourceAsStream(
407: encodedClassName);
408:
409: if (inputStream == null) {
410: ClassNotFound classNotFound = new ClassNotFound(className);
411: selectionKey.attach(classNotFound.getMessage());
412: if (logger.isLoggable(Level.FINE)) {
413: logger.log(Level.FINE, "Class '" + className
414: + "' not found");
415: }
416: return;
417: }
418: byte[] bytecode = null;
419: try {
420: // Get bytecode of the class
421: bytecode = readClass(inputStream);
422:
423: } finally {
424: inputStream.close();
425: }
426:
427: // Create answer object
428: ClassAnswer classAnswer = new ClassAnswer(className, bytecode);
429:
430: // Attach the answer on the key
431: selectionKey.attach(classAnswer.getMessage());
432:
433: }
434:
435: /**
436: * Handle the client's request asking for a resource.
437: * @param selectionKey key for exchanging with the client.
438: * @param dataBuffer the buffer that contains request.
439: * @throws IOException if operation fails
440: */
441: private void handleReadResourceRequest(
442: final SelectionKey selectionKey, final ByteBuffer dataBuffer)
443: throws IOException {
444:
445: // Build object (from input)
446: ResourceRequest resourceRequest = new ResourceRequest(
447: dataBuffer);
448: String resourceName = resourceRequest.getResourceName();
449:
450: // Answer to the client
451: selectionKey.interestOps(SelectionKey.OP_WRITE);
452:
453: // Find the resource from the classloader
454: URL url = Thread.currentThread().getContextClassLoader()
455: .getResource(resourceName);
456: if (url == null) {
457: ResourceNotFound resourceNotFound = new ResourceNotFound(
458: resourceName);
459: selectionKey.attach(resourceNotFound.getMessage());
460: if (logger.isLoggable(Level.FINE)) {
461: logger.log(Level.FINE, "Resource '" + resourceName
462: + "' not found");
463: }
464: return;
465: }
466: InputStream inputStream = url.openStream();
467:
468: byte[] bytes = null;
469: try {
470: // Get bytecode of the class
471: bytes = readClass(inputStream);
472:
473: } finally {
474: inputStream.close();
475: }
476:
477: // Create answer object
478: ResourceAnswer resourceAnswer = new ResourceAnswer(
479: resourceName, bytes);
480:
481: // Attach the answer on the key
482: selectionKey.attach(resourceAnswer.getMessage());
483:
484: }
485:
486: /**
487: * Handle the client's request asking for the default provider URL.
488: * @param selectionKey key for exchanging with the client.
489: * @param dataBuffer the buffer that contains request.
490: * @throws IOException if operation fails
491: */
492: private void handleReadProviderURLRequest(
493: final SelectionKey selectionKey, final ByteBuffer dataBuffer)
494: throws IOException {
495:
496: // Answer to the client (go in write mode)
497: selectionKey.interestOps(SelectionKey.OP_WRITE);
498:
499: String providerURL = registryComponent.getProviderURL();
500:
501: if (logger.isLoggable(Level.FINE)) {
502: logger.log(Level.FINE, "Provider URL asked by client : '"
503: + providerURL + "'.");
504: }
505:
506: // Create answer object
507: ProviderURLAnswer providerURLAnswer = new ProviderURLAnswer(
508: providerURL);
509:
510: // Attach the answer on the key
511: selectionKey.attach(providerURLAnswer.getMessage());
512:
513: }
514:
515: /**
516: * Handle all write operations on channels.
517: * @param selectionKey the selected key.
518: * @throws IOException if cannot write to the channel.
519: */
520: private void handleWrite(final SelectionKey selectionKey)
521: throws IOException {
522: SocketChannel channel = (SocketChannel) selectionKey.channel();
523:
524: // Write the data that was attached on the selection key
525: ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
526: if (buffer.hasRemaining()) {
527: channel.write(buffer);
528: } else {
529: // finished to write, close
530: channel.close();
531: }
532: }
533:
534: /**
535: * Gets the bytes from the given input stream.
536: * @param is given input stream.
537: * @return the array of bytes for the given input stream.
538: * @throws IOException if class cannot be read.
539: */
540: private static byte[] readClass(final InputStream is)
541: throws IOException {
542: if (is == null) {
543: throw new IOException("Given input stream is null");
544: }
545: byte[] b = new byte[is.available()];
546: int len = 0;
547: while (true) {
548: int n = is.read(b, len, b.length - len);
549: if (n == -1) {
550: if (len < b.length) {
551: byte[] c = new byte[len];
552: System.arraycopy(b, 0, c, 0, len);
553: b = c;
554: }
555: return b;
556: }
557: len += n;
558: if (len == b.length) {
559: byte[] c = new byte[b.length + BUF_APPEND];
560: System.arraycopy(b, 0, c, 0, len);
561: b = c;
562: }
563: }
564: }
565:
566: /**
567: * Launch the thread looking at the selectors.
568: */
569: public void run() {
570: handleSelectors();
571: }
572:
573: /**
574: * Sets the port number of this protocol.
575: * @param portNumber the port for listening
576: */
577: public void setPortNumber(final int portNumber) {
578: this .portNumber = portNumber;
579: }
580:
581: /**
582: * Sets the registry component.
583: * @param registryComponent the given component.
584: */
585: public void setRegistryComponent(
586: final RegistryComponent registryComponent) {
587: this.registryComponent = registryComponent;
588: }
589:
590: }
|