001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.mq.il.oil;
023:
024: import java.io.BufferedInputStream;
025: import java.io.BufferedOutputStream;
026: import java.io.EOFException;
027: import java.io.IOException;
028: import java.io.ObjectInputStream;
029: import java.io.ObjectOutputStream;
030: import java.lang.reflect.Method;
031: import java.net.InetAddress;
032: import java.net.ServerSocket;
033: import java.net.Socket;
034: import java.net.SocketException;
035: import java.net.UnknownHostException;
036: import java.rmi.RemoteException;
037: import java.util.Properties;
038:
039: import javax.jms.Destination;
040: import javax.jms.JMSException;
041: import javax.naming.InitialContext;
042: import javax.net.ServerSocketFactory;
043:
044: import org.jboss.logging.Logger;
045: import org.jboss.mq.AcknowledgementRequest;
046: import org.jboss.mq.ConnectionToken;
047: import org.jboss.mq.DurableSubscriptionID;
048: import org.jboss.mq.SpyDestination;
049: import org.jboss.mq.SpyMessage;
050: import org.jboss.mq.Subscription;
051: import org.jboss.mq.TransactionRequest;
052: import org.jboss.mq.il.Invoker;
053: import org.jboss.mq.il.ServerIL;
054: import org.jboss.security.SecurityDomain;
055: import org.jboss.system.server.ServerConfigUtil;
056:
057: /**
058: * Implements the ServerILJMXService which is used to manage the JVM IL.
059: *
060: * @author Hiram Chirino (Cojonudo14@hotmail.com)
061: * @version $Revision: 57198 $
062: *
063: * @jmx:mbean extends="org.jboss.mq.il.ServerILJMXServiceMBean"
064: */
065: public final class OILServerILService extends
066: org.jboss.mq.il.ServerILJMXService implements
067: java.lang.Runnable, OILServerILServiceMBean {
068: /**
069: * logger instance.
070: */
071: final static private Logger log = Logger
072: .getLogger(OILServerILService.class);
073:
074: /**
075: * The default timeout for the server socket. This is
076: * set so the socket will periodically return to check
077: * the running flag.
078: */
079: private final static int SO_TIMEOUT = 5000;
080:
081: /**
082: * The JMS server where requests are forwarded to.
083: */
084: //private static JMSServerInvoker server;
085: private Invoker server;
086:
087: /**
088: * If the TcpNoDelay option should be used on the socket.
089: */
090: private boolean enableTcpNoDelay = false;
091:
092: /**
093: * The read timeout
094: */
095: private int readTimeout = 0;
096:
097: /** The security domain name to use with SSL aware socket factories.
098: */
099: private String securityDomain;
100:
101: /* The javax.net.SocketFactory implementation class to use on the client.
102: */
103: private String clientSocketFactoryName;
104: /** The socket factory used to obtain the server socket.
105: */
106: private ServerSocketFactory serverSocketFactory;
107: /**
108: * The listening socket that receives incomming connections
109: * for servicing.
110: */
111: private ServerSocket serverSocket;
112:
113: /**
114: * The managed serverIL.
115: */
116: private OILServerIL serverIL;
117:
118: /**
119: * The running flag that all worker and server
120: * threads check to determine if the service should
121: * be stopped.
122: */
123: private volatile boolean running;
124:
125: /**
126: * The server port to bind to.
127: */
128: private int serverBindPort = 0;
129:
130: /**
131: * The internet address to bind to by
132: * default.
133: */
134: private InetAddress bindAddress = null;
135:
136: /**
137: * Number of OIL Worker threads started.
138: */
139: private int threadNumber = 0;
140:
141: /**
142: * The connection properties passed to the client to connect to this IL
143: */
144: private Properties connectionProperties;
145:
146: /**
147: * This class is used to encapsulate the basic connection and
148: * work for a connected client thread. The run() method of this
149: * class processes requests and sends responses to and from a
150: * single client. All requests are forwarded to the outer class'
151: * JMS server instance.
152: *
153: * @author Brian Weaver (weave@opennms.org)
154: */
155: private final class Client implements Runnable {
156: /**
157: * The TCP/IP socket for communications.
158: */
159: private Socket sock;
160:
161: /**
162: * The object output stream running on top of the
163: * socket's output stream.
164: */
165: private ObjectOutputStream out;
166:
167: /**
168: * The objec5t input stream running on top of the
169: * socket's input stream
170: */
171: private ObjectInputStream in;
172:
173: /**
174: * Allocates a new runnable instance to process requests from
175: * the server's client and pass them to the JMS server. The
176: * passed socket is used for all communications between the
177: * service and the client.
178: *
179: * @param s The socket used for communications.
180: *
181: * @throws java.io.IOException Thrown if an I/O error occurs
182: * constructing the object streams.
183: */
184: Client(Socket s) throws IOException {
185: this .sock = s;
186: this .out = new ObjectOutputStream(new BufferedOutputStream(
187: this .sock.getOutputStream()));
188: this .out.flush();
189: this .in = new ObjectInputStream(new BufferedInputStream(
190: this .sock.getInputStream()));
191: sock.setTcpNoDelay(enableTcpNoDelay);
192: if (log.isTraceEnabled())
193: log.trace("Setting TcpNoDelay Option to:"
194: + enableTcpNoDelay);
195: }
196:
197: /**
198: * The main threads processing routine. This loop processes
199: * requests from the server and sends the appropriate responses
200: * based upon the results.
201: */
202: public void run() {
203: int code = 0;
204: boolean closed = false;
205: ConnectionToken connectionToken = null;
206:
207: while (!closed && running) {
208: try {
209: // read in the next request/directive
210: // from the client
211: //
212: code = in.readByte();
213: } catch (EOFException e) {
214: // end of file, exit processing loop
215: //
216: break;
217: } catch (IOException e) {
218: if (closed || !running) {
219: // exit out of the loop if the connection
220: // is closed or the service is shutdown
221: //
222: break;
223: }
224: log.warn("Connection failure (1).", e);
225: break;
226: }
227:
228: // now based upon the input directive, preform the
229: // requested action. Any exceptions are processed
230: // and potentially returned to the client.
231: //
232: try {
233: Object result = null;
234:
235: switch (code) {
236: case OILConstants.SET_SPY_DISTRIBUTED_CONNECTION:
237: // assert connectionToken == null
238: connectionToken = (ConnectionToken) in
239: .readObject();
240: break;
241:
242: case OILConstants.ACKNOWLEDGE:
243: AcknowledgementRequest ack = new AcknowledgementRequest();
244: ack.readExternal(in);
245: server.acknowledge(connectionToken, ack);
246: break;
247:
248: case OILConstants.ADD_MESSAGE:
249: server.addMessage(connectionToken, SpyMessage
250: .readMessage(in));
251: break;
252:
253: case OILConstants.BROWSE:
254: result = server.browse(connectionToken,
255: (Destination) in.readObject(),
256: (String) in.readObject());
257: break;
258:
259: case OILConstants.CHECK_ID:
260: String ID = (String) in.readObject();
261: server.checkID(ID);
262: if (connectionToken != null)
263: connectionToken.setClientID(ID);
264: break;
265:
266: case OILConstants.CONNECTION_CLOSING:
267: server.connectionClosing(connectionToken);
268: closed = true;
269: break;
270:
271: case OILConstants.CREATE_QUEUE:
272: result = server.createQueue(connectionToken,
273: (String) in.readObject());
274: break;
275:
276: case OILConstants.CREATE_TOPIC:
277: result = server.createTopic(connectionToken,
278: (String) in.readObject());
279: break;
280:
281: case OILConstants.DELETE_TEMPORARY_DESTINATION:
282: server.deleteTemporaryDestination(
283: connectionToken, (SpyDestination) in
284: .readObject());
285: break;
286:
287: case OILConstants.GET_ID:
288: result = server.getID();
289: if (connectionToken != null)
290: connectionToken
291: .setClientID((String) result);
292: break;
293:
294: case OILConstants.GET_TEMPORARY_QUEUE:
295: result = server
296: .getTemporaryQueue(connectionToken);
297: break;
298:
299: case OILConstants.GET_TEMPORARY_TOPIC:
300: result = server
301: .getTemporaryTopic(connectionToken);
302: break;
303:
304: case OILConstants.RECEIVE:
305: result = server.receive(connectionToken, in
306: .readInt(), in.readLong());
307: break;
308:
309: case OILConstants.SET_ENABLED:
310: server.setEnabled(connectionToken, in
311: .readBoolean());
312: break;
313:
314: case OILConstants.SUBSCRIBE:
315: server.subscribe(connectionToken,
316: (Subscription) in.readObject());
317: break;
318:
319: case OILConstants.TRANSACT:
320: TransactionRequest trans = new TransactionRequest();
321: trans.readExternal(in);
322: server.transact(connectionToken, trans);
323: break;
324:
325: case OILConstants.UNSUBSCRIBE:
326: server.unsubscribe(connectionToken, in
327: .readInt());
328: break;
329:
330: case OILConstants.DESTROY_SUBSCRIPTION:
331: server
332: .destroySubscription(connectionToken,
333: (DurableSubscriptionID) in
334: .readObject());
335: break;
336:
337: case OILConstants.CHECK_USER:
338: result = server
339: .checkUser((String) in.readObject(),
340: (String) in.readObject());
341: break;
342:
343: case OILConstants.PING:
344: server.ping(connectionToken, in.readLong());
345: break;
346:
347: case OILConstants.AUTHENTICATE:
348: result = server
349: .authenticate((String) in.readObject(),
350: (String) in.readObject());
351: break;
352:
353: default:
354: throw new RemoteException("Bad method code !");
355: }
356:
357: //Everthing was OK, otherwise
358: //an exception would have prevented us from getting
359: //to this point in the code ;-)
360: //
361: try {
362: if (result == null) {
363: out.writeByte(OILConstants.SUCCESS);
364: } else {
365: out.writeByte(OILConstants.SUCCESS_OBJECT);
366: out.writeObject(result);
367: out.reset();
368: }
369: out.flush();
370: } catch (IOException e) {
371: if (closed)
372: break;
373:
374: log.warn("Connection failure (2).", e);
375: break;
376: }
377:
378: } catch (Exception e) {
379: // this processes any exceptions from the actually switch
380: // statement processing
381: //
382: if (closed)
383: break;
384:
385: log
386: .warn(
387: "Client request resulted in a server exception: ",
388: e);
389:
390: try {
391: out.writeByte(OILConstants.EXCEPTION);
392: out.writeObject(e);
393: out.reset();
394: out.flush();
395: } catch (IOException e2) {
396: if (closed)
397: break;
398:
399: log.warn("Connection failure (3).", e);
400: break;
401: }
402: } // end catch of try { switch(code) ... }
403: } // end whlie loop
404:
405: try {
406: if (!closed) {
407: try {
408: server.connectionClosing(connectionToken);
409: } catch (JMSException e) {
410: // do nothing
411: }
412: }
413: in.close();
414: out.close();
415: } catch (IOException e) {
416: log.warn("Connection failure during connection close.",
417: e);
418: } finally {
419: try {
420: sock.close();
421: } catch (IOException e) {
422: log
423: .warn(
424: "Connection failure during connection close.",
425: e);
426: }
427: }
428: } // end run method
429: }
430:
431: /**
432: * Used to construct the GenericConnectionFactory (bindJNDIReferences()
433: * builds it) Sets up the connection properties need by a client to use this
434: * IL
435: *
436: * @return The ClientConnectionProperties value
437: */
438: public java.util.Properties getClientConnectionProperties() {
439: return connectionProperties;
440: }
441:
442: /**
443: * Gives this JMX service a name.
444: *
445: * @return The Name value
446: */
447: public String getName() {
448: return "JBossMQ-OILServerIL";
449: }
450:
451: /**
452: * Used to construct the GenericConnectionFactory (bindJNDIReferences()
453: * builds it)
454: *
455: * @return The ServerIL value
456: * @returns ServerIL the instance of this IL
457: */
458: public ServerIL getServerIL() {
459: return serverIL;
460: }
461:
462: /**
463: * Main processing method for the OILServerILService object
464: */
465: public void run() {
466: try {
467: while (running) {
468: Socket socket = null;
469: try {
470: socket = serverSocket.accept();
471: } catch (java.io.InterruptedIOException e) {
472: continue;
473: }
474:
475: // it's possible that the service is no longer
476: // running but it got a connection, no point in
477: // starting up a thread!
478: //
479: if (!running) {
480: if (socket != null) {
481: try {
482: socket.close();
483: } catch (Exception e) {
484: // do nothing
485: }
486: }
487: return;
488: }
489:
490: try {
491: socket.setSoTimeout(readTimeout);
492: new Thread(new Client(socket), "OIL Worker-"
493: + threadNumber++).start();
494: } catch (IOException ie) {
495: if (log.isDebugEnabled()) {
496: log
497: .debug(
498: "IOException processing client connection",
499: ie);
500: log
501: .debug("Dropping client connection, server will not terminate");
502: }
503: }
504: }
505: } catch (SocketException e) {
506: // There is no easy way (other than string comparison) to
507: // determine if the socket exception is caused by connection
508: // reset by peer. In this case, it's okay to ignore both
509: // SocketException and IOException.
510: if (running)
511: log
512: .warn("SocketException occured (Connection reset by peer?). Cannot initialize the OILServerILService.");
513: } catch (IOException e) {
514: if (running)
515: log
516: .warn(
517: "IOException occured. Cannot initialize the OILServerILService.",
518: e);
519: } catch (Throwable t) {
520: log
521: .warn(
522: "Unexpected error occured. Cannot initialize the OILServerILService.",
523: t);
524: }
525: try {
526: serverSocket.close();
527: } catch (Exception e) {
528: log.debug("error closing server socket", e);
529: }
530: return;
531: }
532:
533: /**
534: * Starts this IL, and binds it to JNDI
535: *
536: * @exception Exception Description of Exception
537: */
538: public void startService() throws Exception {
539: super .startService();
540:
541: running = true;
542: this .server = lookupJMSServer();
543:
544: // Use the default javax.net.ServerSocketFactory if none was set
545: if (serverSocketFactory == null)
546: serverSocketFactory = ServerSocketFactory.getDefault();
547:
548: /* See if the server socket supports setSecurityDomain(SecurityDomain)
549: if an securityDomain was specified
550: */
551: if (securityDomain != null) {
552: try {
553: InitialContext ctx = new InitialContext();
554: Class ssfClass = serverSocketFactory.getClass();
555: SecurityDomain domain = (SecurityDomain) ctx
556: .lookup(securityDomain);
557: Class[] parameterTypes = { SecurityDomain.class };
558: Method m = ssfClass.getMethod("setSecurityDomain",
559: parameterTypes);
560: Object[] args = { domain };
561: m.invoke(serverSocketFactory, args);
562: } catch (NoSuchMethodException e) {
563: log
564: .error("Socket factory does not support setSecurityDomain(SecurityDomain)");
565: } catch (Exception e) {
566: log.error("Failed to setSecurityDomain="
567: + securityDomain + " on socket factory");
568: }
569: }
570:
571: // Create the server socket using the socket factory
572: serverSocket = serverSocketFactory.createServerSocket(
573: serverBindPort, 50, bindAddress);
574: serverSocket.setSoTimeout(SO_TIMEOUT);
575:
576: InetAddress socketAddress = serverSocket.getInetAddress();
577: if (log.isInfoEnabled())
578: log
579: .info("JBossMQ OIL service available at : "
580: + socketAddress + ":"
581: + serverSocket.getLocalPort());
582:
583: new Thread(server.getThreadGroup(), this , "OIL Worker Server")
584: .start();
585:
586: /* We need to check the socketAddress against "0.0.0.0/0.0.0.0"
587: because this is not a valid address on Win32 while it is for
588: *NIX. See BugParade bug #4343286.
589: */
590: socketAddress = ServerConfigUtil
591: .fixRemoteAddress(socketAddress);
592:
593: serverIL = new OILServerIL(socketAddress, serverSocket
594: .getLocalPort(), clientSocketFactoryName,
595: enableTcpNoDelay);
596:
597: // Initialize the connection poperties using the base class.
598: connectionProperties = super .getClientConnectionProperties();
599: connectionProperties.setProperty(
600: OILServerILFactory.CLIENT_IL_SERVICE_KEY,
601: "org.jboss.mq.il.oil.OILClientILService");
602: connectionProperties.setProperty(
603: OILServerILFactory.OIL_PORT_KEY, ""
604: + serverSocket.getLocalPort());
605: connectionProperties.setProperty(
606: OILServerILFactory.OIL_ADDRESS_KEY, ""
607: + socketAddress.getHostAddress());
608: connectionProperties.setProperty(
609: OILServerILFactory.OIL_TCPNODELAY_KEY,
610: enableTcpNoDelay ? "yes" : "no");
611:
612: bindJNDIReferences();
613:
614: }
615:
616: /**
617: * Stops this IL, and unbinds it from JNDI.
618: */
619: public void stopService() {
620: try {
621: unbindJNDIReferences();
622: } catch (Exception e) {
623: log.error("Exception unbinding from JNDI", e);
624: }
625: try {
626: running = false;
627: if (serverSocket != null)
628: serverSocket.close();
629: } catch (Exception e) {
630: log.debug("Exception stopping server thread", e);
631: }
632: }
633:
634: /**
635: * Get the OIL server listening port
636: *
637: * @return Value of property serverBindPort.
638: *
639: * @jmx:managed-attribute
640: */
641: public int getServerBindPort() {
642: return serverBindPort;
643: }
644:
645: /**
646: * Set the OIL server listening port
647: *
648: * @param serverBindPort New value of property serverBindPort.
649: *
650: * @jmx:managed-attribute
651: */
652: public void setServerBindPort(int serverBindPort) {
653: this .serverBindPort = serverBindPort;
654: }
655:
656: /**
657: * Get the interface address the OIL server bind its listening port on.
658: *
659: * @return The hostname or dotted decimal address that the service is
660: * bound to.
661: *
662: * @jmx:managed-attribute
663: */
664: public String getBindAddress() {
665: String addr = "0.0.0.0";
666: if (bindAddress != null)
667: addr = bindAddress.getHostName();
668: return addr;
669: }
670:
671: /**
672: * Set the interface address the OIL server bind its listening port on.
673: *
674: * @param host The host address to bind to, if any.
675: *
676: * @throws java.net.UnknownHostException Thrown if the hostname cannot
677: * be resolved to an InetAddress object.
678: *
679: * @jmx:managed-attribute
680: */
681: public void setBindAddress(String host) throws UnknownHostException {
682: // If host is null or empty use any address
683: if (host == null || host.length() == 0)
684: bindAddress = null;
685: else
686: bindAddress = InetAddress.getByName(host);
687: }
688:
689: /**
690: * Gets the enableTcpNoDelay.
691: * @return Returns a boolean
692: *
693: * @jmx:managed-attribute
694: */
695: public boolean getEnableTcpNoDelay() {
696: return enableTcpNoDelay;
697: }
698:
699: /**
700: * Sets the enableTcpNoDelay.
701: * @param enableTcpNoDelay The enableTcpNoDelay to set
702: *
703: * @jmx:managed-attribute
704: */
705: public void setEnableTcpNoDelay(boolean enableTcpNoDelay) {
706: this .enableTcpNoDelay = enableTcpNoDelay;
707: }
708:
709: /**
710: * Gets the socket read timeout.
711: * @return Returns the read timeout in milli-seconds
712: *
713: * @jmx:managed-attribute
714: */
715: public int getReadTimeout() {
716: return readTimeout;
717: }
718:
719: /**
720: * Sets the read time out.
721: * @param timeout The read time out in milli seconds
722: *
723: * @jmx:managed-attribute
724: */
725: public void setReadTimeout(int timeout) {
726: this .readTimeout = timeout;
727: }
728:
729: /** Get the javax.net.SocketFactory implementation class to use on the
730: *client.
731: * @jmx:managed-attribute
732: */
733: public String getClientSocketFactory() {
734: return clientSocketFactoryName;
735: }
736:
737: /** Set the javax.net.SocketFactory implementation class to use on the
738: *client.
739: * @jmx:managed-attribute
740: */
741: public void setClientSocketFactory(String name) {
742: this .clientSocketFactoryName = name;
743: }
744:
745: /** Set the javax.net.ServerSocketFactory implementation class to use to
746: *create the service SocketFactory.
747: *@jmx:managed-attribute
748: */
749: public void setServerSocketFactory(String name) throws Exception {
750: ClassLoader loader = Thread.currentThread()
751: .getContextClassLoader();
752: Class ssfClass = loader.loadClass(name);
753: serverSocketFactory = (ServerSocketFactory) ssfClass
754: .newInstance();
755: }
756:
757: /** Get the javax.net.ServerSocketFactory implementation class to use to
758: *create the service SocketFactory.
759: *@jmx:managed-attribute
760: */
761: public String getServerSocketFactory() {
762: String name = null;
763: if (serverSocketFactory != null)
764: name = serverSocketFactory.getClass().getName();
765: return name;
766: }
767:
768: /** Set the security domain name to use with SSL aware socket factories
769: *@jmx:managed-attribute
770: */
771: public void setSecurityDomain(String domainName) {
772: this .securityDomain = domainName;
773: }
774:
775: /** Get the security domain name to use with SSL aware socket factories
776: *@jmx:managed-attribute
777: */
778: public String getSecurityDomain() {
779: return this .securityDomain;
780: }
781: }
782: // vim:expandtab:tabstop=3:shiftwidth=3
|