001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.il.oil2;
023:
024: import java.io.BufferedInputStream;
025: import java.io.BufferedOutputStream;
026: import java.io.IOException;
027: import java.io.ObjectInputStream;
028: import java.io.ObjectOutputStream;
029: import java.lang.reflect.Method;
030: import java.net.InetAddress;
031: import java.net.ServerSocket;
032: import java.net.Socket;
033: import java.net.SocketException;
034: import java.net.UnknownHostException;
035: import java.rmi.RemoteException;
036: import java.util.Properties;
037:
038: import javax.jms.Destination;
039: import javax.jms.JMSException;
040: import javax.naming.InitialContext;
041: import javax.net.ServerSocketFactory;
042:
043: import org.jboss.logging.Logger;
044: import org.jboss.mq.AcknowledgementRequest;
045: import org.jboss.mq.ConnectionToken;
046: import org.jboss.mq.DurableSubscriptionID;
047: import org.jboss.mq.SpyDestination;
048: import org.jboss.mq.SpyMessage;
049: import org.jboss.mq.Subscription;
050: import org.jboss.mq.TransactionRequest;
051: import org.jboss.mq.il.Invoker;
052: import org.jboss.mq.il.ServerIL;
053: import org.jboss.security.SecurityDomain;
054:
055: /**
056: * Implements the ServerILJMXService which is used to manage the OIL2 IL.
057: *
058: * @author <a href="mailto:hiram.chirino@jboss.org">Hiram Chirino</a>
059: * @version $Revision: 1.22 $
060: * @jmx:mbean extends="org.jboss.mq.il.ServerILJMXServiceMBean"
061: */
062: public final class OIL2ServerILService extends
063: org.jboss.mq.il.ServerILJMXService implements
064: java.lang.Runnable, OIL2ServerILServiceMBean {
065: /**
066: * logger instance.
067: */
068: final static private Logger log = Logger
069: .getLogger(OIL2ServerILService.class);
070:
071: /**
072: * The default timeout for the server socket. This is
073: * set so the socket will periodically return to check
074: * the running flag.
075: */
076: private final static int SO_TIMEOUT = 5000;
077:
078: /**
079: * The JMS server where requests are forwarded to.
080: */
081: //private static JMSServerInvoker server;
082: private Invoker server;
083:
084: /**
085: * If the TcpNoDelay option should be used on the socket.
086: */
087: private boolean enableTcpNoDelay = false;
088:
089: /** The security domain name to use with SSL aware socket factories.
090: */
091: private String securityDomain;
092:
093: /* The javax.net.SocketFactory implementation class to use on the client.
094: */
095: private String clientSocketFactoryName;
096: /** The socket factory used to obtain the server socket.
097: */
098: private ServerSocketFactory serverSocketFactory;
099: /**
100: * The listening socket that receives incomming connections
101: * for servicing.
102: */
103: private ServerSocket serverSocket;
104:
105: /**
106: * The managed serverIL.
107: */
108: private OIL2ServerIL serverIL;
109:
110: /**
111: * The running flag that all worker and server
112: * threads check to determine if the service should
113: * be stopped.
114: */
115: private volatile boolean running;
116:
117: /**
118: * The server port to bind to.
119: */
120: private int serverBindPort = 0;
121:
122: /**
123: * The internet address to bind to by
124: * default.
125: */
126: private InetAddress bindAddress = null;
127:
128: /**
129: * The connection properties passed to the client to connect to this IL
130: */
131: private Properties connectionProperties;
132:
133: public class RequestListner implements OIL2RequestListner {
134:
135: Socket socket;
136: ObjectInputStream in;
137: ObjectOutputStream out;
138: OIL2SocketHandler socketHandler;
139: ConnectionToken connectionToken;
140: boolean closing = false;
141:
142: RequestListner(Socket socket) throws IOException {
143: socket.setSoTimeout(0);
144: socket.setTcpNoDelay(enableTcpNoDelay);
145: out = new ObjectOutputStream(new BufferedOutputStream(
146: socket.getOutputStream()));
147: out.flush();
148: in = new ObjectInputStream(new BufferedInputStream(socket
149: .getInputStream()));
150: }
151:
152: public void handleRequest(OIL2Request request) {
153: // if( log.isTraceEnabled() )
154: // log.trace("RequestListner handing request: "+request);
155:
156: if (closing) {
157: log
158: .trace("A connection that is closing received another request. Droping request.");
159: return;
160: }
161:
162: Object result = null;
163: Exception resultException = null;
164:
165: // now based upon the input directive, preform the
166: // requested action. Any exceptions are processed
167: // and potentially returned to the client.
168: //
169: try {
170: switch (request.operation) {
171: case OIL2Constants.SERVER_SET_SPY_DISTRIBUTED_CONNECTION:
172: connectionToken = (ConnectionToken) request.arguments[0];
173: // Make the client IL aware of us since he will be using our requestHander
174: // To make requests.
175: ((OIL2ClientIL) connectionToken.clientIL)
176: .setRequestListner(this );
177: break;
178:
179: case OIL2Constants.SERVER_ACKNOWLEDGE:
180: server
181: .acknowledge(
182: connectionToken,
183: (AcknowledgementRequest) request.arguments[0]);
184: break;
185:
186: case OIL2Constants.SERVER_ADD_MESSAGE:
187: server.addMessage(connectionToken,
188: (SpyMessage) request.arguments[0]);
189: break;
190:
191: case OIL2Constants.SERVER_BROWSE:
192: result = server.browse(connectionToken,
193: (Destination) request.arguments[0],
194: (String) request.arguments[1]);
195: break;
196:
197: case OIL2Constants.SERVER_CHECK_ID:
198: server.checkID((String) request.arguments[0]);
199: if (connectionToken != null)
200: connectionToken
201: .setClientID((String) request.arguments[0]);
202: break;
203:
204: case OIL2Constants.SERVER_CONNECTION_CLOSING:
205: beginClose();
206: break;
207:
208: case OIL2Constants.SERVER_CREATE_QUEUE:
209: result = server.createQueue(connectionToken,
210: (String) request.arguments[0]);
211: break;
212:
213: case OIL2Constants.SERVER_CREATE_TOPIC:
214: result = server.createTopic(connectionToken,
215: (String) request.arguments[0]);
216: break;
217:
218: case OIL2Constants.SERVER_DELETE_TEMPORARY_DESTINATION:
219: server.deleteTemporaryDestination(connectionToken,
220: (SpyDestination) request.arguments[0]);
221: break;
222:
223: case OIL2Constants.SERVER_GET_ID:
224: result = server.getID();
225: if (connectionToken != null)
226: connectionToken.setClientID((String) result);
227: break;
228:
229: case OIL2Constants.SERVER_GET_TEMPORARY_QUEUE:
230: result = server.getTemporaryQueue(connectionToken);
231: break;
232:
233: case OIL2Constants.SERVER_GET_TEMPORARY_TOPIC:
234: result = server.getTemporaryTopic(connectionToken);
235: break;
236:
237: case OIL2Constants.SERVER_RECEIVE:
238: result = server
239: .receive(connectionToken,
240: ((Integer) request.arguments[0])
241: .intValue(),
242: ((Long) request.arguments[1])
243: .longValue());
244: break;
245:
246: case OIL2Constants.SERVER_SET_ENABLED:
247: server.setEnabled(connectionToken,
248: ((Boolean) request.arguments[0])
249: .booleanValue());
250: break;
251:
252: case OIL2Constants.SERVER_SUBSCRIBE:
253: server.subscribe(connectionToken,
254: (Subscription) request.arguments[0]);
255: break;
256:
257: case OIL2Constants.SERVER_TRANSACT:
258: server.transact(connectionToken,
259: (TransactionRequest) request.arguments[0]);
260: break;
261:
262: case OIL2Constants.SERVER_UNSUBSCRIBE:
263: server
264: .unsubscribe(connectionToken,
265: ((Integer) request.arguments[0])
266: .intValue());
267: break;
268:
269: case OIL2Constants.SERVER_DESTROY_SUBSCRIPTION:
270: server
271: .destroySubscription(
272: connectionToken,
273: (DurableSubscriptionID) request.arguments[0]);
274: break;
275:
276: case OIL2Constants.SERVER_CHECK_USER:
277: result = server.checkUser(
278: (String) request.arguments[0],
279: (String) request.arguments[1]);
280: break;
281:
282: case OIL2Constants.SERVER_PING:
283: server.ping(connectionToken,
284: ((Long) request.arguments[0]).longValue());
285: break;
286:
287: case OIL2Constants.SERVER_AUTHENTICATE:
288: result = server.authenticate(
289: (String) request.arguments[0],
290: (String) request.arguments[1]);
291: break;
292:
293: default:
294: throw new RemoteException("Bad method code !");
295: } // switch
296: } catch (Exception e) {
297: resultException = e;
298: } // try
299:
300: try {
301: OIL2Response response = new OIL2Response(request);
302: response.result = result;
303: response.exception = resultException;
304: socketHandler.sendResponse(response);
305: } catch (IOException e) {
306: handleConnectionException(e);
307: }
308: }
309:
310: public void handleConnectionException(Exception e) {
311: if (!closing)
312: log.info("Client Disconnected: " + e);
313: beginClose();
314: }
315:
316: void beginClose() {
317: closing = true;
318: try {
319: if (connectionToken != null)
320: server.connectionClosing(connectionToken);
321: } catch (JMSException ignore) {
322: } finally {
323: close();
324: }
325: }
326:
327: void close() {
328: try {
329: if (socket != null) {
330: socketHandler.stop();
331: in.close();
332: out.close();
333: socket.close();
334: socket = null;
335: }
336: } catch (IOException e) {
337: log
338: .debug(
339: "Exception occured while closing opened resources: ",
340: e);
341: }
342: }
343:
344: public OIL2SocketHandler getSocketHandler() {
345: return socketHandler;
346: }
347:
348: }
349:
350: /**
351: * Used to construct the GenericConnectionFactory (bindJNDIReferences()
352: * builds it) Sets up the connection properties need by a client to use this
353: * IL
354: *
355: * @return The ClientConnectionProperties value
356: */
357: public java.util.Properties getClientConnectionProperties() {
358: return connectionProperties;
359: }
360:
361: /**
362: * Gives this JMX service a name.
363: *
364: * @return The Name value
365: */
366: public String getName() {
367: return "JBossMQ-OILServerIL";
368: }
369:
370: /**
371: * Used to construct the GenericConnectionFactory (bindJNDIReferences()
372: * builds it)
373: *
374: * @return The ServerIL value
375: * @returns ServerIL the instance of this IL
376: */
377: public ServerIL getServerIL() {
378: return serverIL;
379: }
380:
381: /**
382: * Main processing method for the OILServerILService object
383: */
384: public void run() {
385: try {
386: while (running) {
387: Socket socket = null;
388: try {
389: socket = serverSocket.accept();
390: if (log.isTraceEnabled())
391: log.trace("Accepted connection: " + socket);
392: } catch (java.io.InterruptedIOException e) {
393: // It's ok, this is due to the SO_TIME_OUT
394: continue;
395: }
396:
397: // it's possible that the service is no longer
398: // running but it got a connection, no point in
399: // starting up a thread!
400: //
401: if (!running) {
402: if (socket != null) {
403: try {
404: socket.close();
405: } catch (Exception ignore) {
406: }
407: }
408: return;
409: }
410:
411: try {
412:
413: if (log.isTraceEnabled())
414: log
415: .trace("Initializing RequestListner for socket: "
416: + socket);
417: RequestListner requestListner = new RequestListner(
418: socket);
419: OIL2SocketHandler socketHandler = new OIL2SocketHandler(
420: requestListner.in, requestListner.out,
421: Thread.currentThread().getThreadGroup());
422: requestListner.socketHandler = socketHandler;
423: socketHandler.setRequestListner(requestListner);
424: socketHandler.start();
425:
426: } catch (IOException ie) {
427: log
428: .debug(
429: "Client connection could not be accepted: ",
430: ie);
431: }
432: }
433: } catch (SocketException e) {
434: // There is no easy way (other than string comparison) to
435: // determine if the socket exception is caused by connection
436: // reset by peer. In this case, it's okay to ignore both
437: // SocketException and IOException.
438: if (running)
439: log
440: .warn("SocketException occured (Connection reset by peer?). Cannot initialize the OIL2ServerILService.");
441: } catch (IOException e) {
442: if (running)
443: log
444: .warn("IOException occured. Cannot initialize the OIL2ServerILService.");
445: } catch (Throwable t) {
446: log
447: .warn(
448: "Unexpected error occured. Cannot initialize the OIL2ServerILService.",
449: t);
450: }
451: try {
452: serverSocket.close();
453: } catch (Exception e) {
454: log.debug("error closing server socket", e);
455: }
456: return;
457: }
458:
459: /**
460: * Starts this IL, and binds it to JNDI
461: *
462: * @exception Exception Description of Exception
463: */
464: public void startService() throws Exception {
465: super .startService();
466:
467: running = true;
468: this .server = lookupJMSServer();
469:
470: // Use the default javax.net.ServerSocketFactory if none was set
471: if (serverSocketFactory == null)
472: serverSocketFactory = ServerSocketFactory.getDefault();
473:
474: /* See if the server socket supports setSecurityDomain(SecurityDomain)
475: if an securityDomain was specified
476: */
477: if (securityDomain != null) {
478: try {
479: InitialContext ctx = new InitialContext();
480: Class ssfClass = serverSocketFactory.getClass();
481: SecurityDomain domain = (SecurityDomain) ctx
482: .lookup(securityDomain);
483: Class[] parameterTypes = { SecurityDomain.class };
484: Method m = ssfClass.getMethod("setSecurityDomain",
485: parameterTypes);
486: Object[] args = { domain };
487: m.invoke(serverSocketFactory, args);
488: } catch (NoSuchMethodException e) {
489: log
490: .error("Socket factory does not support setSecurityDomain(SecurityDomain)");
491: } catch (Exception e) {
492: log.error("Failed to setSecurityDomain="
493: + securityDomain + " on socket factory");
494: }
495: }
496:
497: // Create the server socket using the socket factory
498: serverSocket = serverSocketFactory.createServerSocket(
499: serverBindPort, 50, bindAddress);
500: serverSocket.setSoTimeout(SO_TIMEOUT);
501:
502: InetAddress socketAddress = serverSocket.getInetAddress();
503: log.info("JBossMQ OIL2 service available at : " + socketAddress
504: + ":" + serverSocket.getLocalPort());
505:
506: new Thread(server.getThreadGroup(), this , "OIL2 Worker Server")
507: .start();
508: /* We need to check the socketAddress against "0.0.0.0/0.0.0.0"
509: because this is not a valid address on Win32 while it is for
510: *NIX. See BugParade bug #4343286.
511: */
512: if (socketAddress.toString().equals("0.0.0.0/0.0.0.0"))
513: socketAddress = InetAddress.getLocalHost();
514:
515: serverIL = new OIL2ServerIL(socketAddress.getHostAddress(),
516: serverSocket.getLocalPort(), clientSocketFactoryName,
517: enableTcpNoDelay);
518:
519: // Initialize the connection poperties using the base class.
520: connectionProperties = super .getClientConnectionProperties();
521: connectionProperties.setProperty(
522: OIL2ServerILFactory.CLIENT_IL_SERVICE_KEY,
523: "org.jboss.mq.il.oil2.OIL2ClientILService");
524: connectionProperties.setProperty(
525: OIL2ServerILFactory.OIL2_PORT_KEY, ""
526: + serverSocket.getLocalPort());
527: connectionProperties.setProperty(
528: OIL2ServerILFactory.OIL2_ADDRESS_KEY, ""
529: + socketAddress.getHostAddress());
530: connectionProperties.setProperty(
531: OIL2ServerILFactory.OIL2_TCPNODELAY_KEY,
532: enableTcpNoDelay ? "yes" : "no");
533:
534: bindJNDIReferences();
535:
536: }
537:
538: /**
539: * Stops this IL, and unbinds it from JNDI.
540: */
541: public void stopService() {
542: try {
543: unbindJNDIReferences();
544: } catch (Exception e) {
545: log.error("Exception unbinding from JNDI", e);
546: }
547: try {
548: running = false;
549: if (serverSocket != null)
550: serverSocket.close();
551: } catch (Exception e) {
552: log.debug("Exception stopping server thread", e);
553: }
554: }
555:
556: /**
557: * Getter for property serverBindPort.
558: *
559: * @return Value of property serverBindPort.
560: * @jmx:managed-attribute
561: */
562: public int getServerBindPort() {
563: return serverBindPort;
564: }
565:
566: /**
567: * Setter for property serverBindPort.
568: *
569: * @param serverBindPort New value of property serverBindPort.
570: * @jmx:managed-attribute
571: */
572: public void setServerBindPort(int serverBindPort) {
573: this .serverBindPort = serverBindPort;
574: }
575:
576: /**
577: * Get the interface address the OIL server bind its listening port on.
578: *
579: * @return The hostname or dotted decimal address that the service is
580: * bound to.
581: * @jmx:managed-attribute
582: */
583: public String getBindAddress() {
584: String addr = "0.0.0.0";
585: if (bindAddress != null)
586: addr = bindAddress.getHostName();
587: return addr;
588: }
589:
590: /**
591: * Set the interface address the OIL server bind its listening port on.
592: *
593: * @param host The host address to bind to, if any.
594: *
595: * @throws java.net.UnknownHostException Thrown if the hostname cannot
596: * be resolved to an InetAddress object.
597: * @jmx:managed-attribute
598: */
599: public void setBindAddress(String host) throws UnknownHostException {
600: // If host is null or empty use any address
601: if (host == null || host.length() == 0)
602: bindAddress = null;
603: else
604: bindAddress = InetAddress.getByName(host);
605: }
606:
607: /**
608: * Gets the enableTcpNoDelay.
609: * @return Returns a boolean
610: * @jmx:managed-attribute
611: */
612: public boolean getEnableTcpNoDelay() {
613: return enableTcpNoDelay;
614: }
615:
616: /**
617: * Sets the enableTcpNoDelay.
618: * @param enableTcpNoDelay The enableTcpNoDelay to set
619: * @jmx:managed-attribute
620: */
621: public void setEnableTcpNoDelay(boolean enableTcpNoDelay) {
622: this .enableTcpNoDelay = enableTcpNoDelay;
623: }
624:
625: /** Get the javax.net.SocketFactory implementation class to use on the
626: *client.
627: * @jmx:managed-attribute
628: */
629: public String getClientSocketFactory() {
630: return clientSocketFactoryName;
631: }
632:
633: /** Set the javax.net.SocketFactory implementation class to use on the
634: *client.
635: * @jmx:managed-attribute
636: */
637: public void setClientSocketFactory(String name) {
638: this .clientSocketFactoryName = name;
639: }
640:
641: /** Set the javax.net.ServerSocketFactory implementation class to use to
642: *create the service SocketFactory.
643: *@jmx:managed-attribute
644: */
645: public void setServerSocketFactory(String name) throws Exception {
646: ClassLoader loader = Thread.currentThread()
647: .getContextClassLoader();
648: Class ssfClass = loader.loadClass(name);
649: serverSocketFactory = (ServerSocketFactory) ssfClass
650: .newInstance();
651: }
652:
653: /** Get the javax.net.ServerSocketFactory implementation class to use to
654: *create the service SocketFactory.
655: *@jmx:managed-attribute
656: */
657: public String getServerSocketFactory() {
658: String name = null;
659: if (serverSocketFactory != null)
660: name = serverSocketFactory.getClass().getName();
661: return name;
662: }
663:
664: /** Set the security domain name to use with SSL aware socket factories
665: *@jmx:managed-attribute
666: */
667: public void setSecurityDomain(String domainName) {
668: this .securityDomain = domainName;
669: }
670:
671: /** Get the security domain name to use with SSL aware socket factories
672: *@jmx:managed-attribute
673: */
674: public String getSecurityDomain() {
675: return this .securityDomain;
676: }
677: }
678: // vim:expandtab:tabstop=3:shiftwidth=3
|