001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.ha.jndi;
023:
024: import java.io.IOException;
025: import java.io.ObjectOutputStream;
026: import java.io.OutputStream;
027: import java.lang.reflect.InvocationTargetException;
028: import java.lang.reflect.Method;
029: import java.lang.reflect.UndeclaredThrowableException;
030: import java.net.DatagramPacket;
031: import java.net.InetAddress;
032: import java.net.MulticastSocket;
033: import java.net.ServerSocket;
034: import java.net.Socket;
035: import java.net.UnknownHostException;
036: import java.rmi.MarshalledObject;
037: import java.util.Collections;
038: import java.util.HashMap;
039: import java.util.Map;
040: import java.util.Set;
041:
042: import javax.management.ObjectInstance;
043: import javax.management.ObjectName;
044: import javax.management.Query;
045: import javax.management.QueryExp;
046: import javax.net.ServerSocketFactory;
047:
048: import org.jboss.ha.framework.interfaces.HAPartition;
049: import org.jboss.ha.framework.server.ClusterPartition;
050: import org.jboss.ha.framework.server.ClusterPartitionMBean;
051: import org.jboss.invocation.Invocation;
052: import org.jboss.invocation.MarshalledInvocation;
053: import org.jboss.logging.Logger;
054: import org.jboss.mx.util.MBeanProxyExt;
055: import org.jboss.system.ServiceMBeanSupport;
056: import org.jboss.system.server.ServerConfigUtil;
057: import org.jboss.util.threadpool.BasicThreadPool;
058: import org.jboss.util.threadpool.BasicThreadPoolMBean;
059: import org.jboss.util.threadpool.ThreadPool;
060: import org.jnp.interfaces.Naming;
061: import org.jnp.interfaces.NamingContext;
062:
063: /**
064: * Management Bean for the protocol independent HA-JNDI service. This allows the
065: * naming service transport layer to be provided by a detached invoker service
066: * like JRMPInvokerHA + ProxyFactoryHA.
067: * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>
068: * @author <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>
069: * @author Scott.Stark@jboss.org
070: * @version $Revision: 57188 $
071: */
072: public class DetachedHANamingService extends ServiceMBeanSupport
073: implements DetachedHANamingServiceMBean {
074: // Constants -----------------------------------------------------
075:
076: // Attributes ----------------------------------------------------
077: /**
078: * The jnp server socket through which the HAJNDI stub is vended
079: */
080: protected ServerSocket bootstrapSocket;
081:
082: /**
083: * The Naming interface server implementation
084: */
085: protected HAJNDI theServer;
086: /**
087: * The mapping from the long method hash to the Naming Method
088: */
089: protected Map marshalledInvocationMapping;
090: /**
091: * The protocol stub returned to clients by the bootstrap lookup
092: */
093: protected Naming stub;
094: /**
095: * The HAPartition used for the state transfer service
096: */
097: protected HAPartition partition;
098: /**
099: * The ClusterPartition with which we are associated.
100: */
101: protected ClusterPartitionMBean clusterPartition;
102: /**
103: * The partition name used to lookup the HAPartition binding
104: */
105: protected String partitionName = ServerConfigUtil
106: .getDefaultPartitionName();
107: /**
108: * The proxy factory service that generates the Naming stub
109: */
110: private ObjectName proxyFactory;
111:
112: /**
113: * The interface to bind to. This is useful for multi-homed hosts that want
114: * control over which interfaces accept connections.
115: */
116: protected InetAddress bindAddress;
117: /**
118: * The bootstrapSocket listen queue depth
119: */
120: protected int backlog = 50;
121: /**
122: * The jnp protocol listening port. The default is 1100, the same as the RMI
123: * registry default port.
124: */
125: protected int port = 1100;
126:
127: /**
128: * The autodiscovery multicast group
129: */
130: protected String adGroupAddress = NamingContext.DEFAULT_DISCOVERY_GROUP_ADDRESS;
131: /**
132: * The autodiscovery port
133: */
134: protected int adGroupPort = NamingContext.DEFAULT_DISCOVERY_GROUP_PORT;
135: /**
136: * The interface to bind the Multicast socket for autodiscovery to
137: */
138: protected InetAddress discoveryBindAddress;
139: /** The runable task for discovery request packets */
140: protected AutomaticDiscovery autoDiscovery = null;
141: /** A flag indicating if autodiscovery should be disabled */
142: protected boolean discoveryDisabled = false;
143: /** The autodiscovery Multicast reply TTL */
144: protected int autoDiscoveryTTL = 16;
145: /**
146: * An optional custom server socket factory for the bootstrap lookup
147: */
148: protected ServerSocketFactory jnpServerSocketFactory;
149: /**
150: * The class name of the optional custom JNP server socket factory
151: */
152: protected String jnpServerSocketFactoryName;
153:
154: /**
155: * The thread pool used to handle jnp stub lookup requests
156: */
157: protected ThreadPool lookupPool;
158:
159: // Public --------------------------------------------------------
160:
161: public DetachedHANamingService() {
162: // for JMX
163: }
164:
165: /**
166: * Expose the Naming service interface mapping as a read-only attribute
167: * @return A Map<Long hash, Method> of the Naming interface
168: * @jmx:managed-attribute
169: */
170: public Map getMethodMap() {
171: return marshalledInvocationMapping;
172: }
173:
174: public ClusterPartitionMBean getClusterPartition() {
175: return clusterPartition;
176: }
177:
178: public void setClusterPartition(
179: ClusterPartitionMBean clusterPartition) {
180: this .clusterPartition = clusterPartition;
181: }
182:
183: public String getPartitionName() {
184: return partitionName;
185: }
186:
187: public void setPartitionName(final String partitionName) {
188: this .partitionName = partitionName;
189: }
190:
191: public ObjectName getProxyFactoryObjectName() {
192: return proxyFactory;
193: }
194:
195: public void setProxyFactoryObjectName(ObjectName proxyFactory) {
196: this .proxyFactory = proxyFactory;
197: }
198:
199: public void setPort(int p) {
200: port = p;
201: }
202:
203: public int getPort() {
204: return port;
205: }
206:
207: public String getBindAddress() {
208: String address = null;
209: if (bindAddress != null)
210: address = bindAddress.getHostAddress();
211: return address;
212: }
213:
214: public void setBindAddress(String host)
215: throws java.net.UnknownHostException {
216: bindAddress = InetAddress.getByName(host);
217: }
218:
219: public int getBacklog() {
220: return backlog;
221: }
222:
223: public void setBacklog(int backlog) {
224: if (backlog <= 0)
225: backlog = 50;
226: this .backlog = backlog;
227: }
228:
229: public void setDiscoveryDisabled(boolean disable) {
230: this .discoveryDisabled = disable;
231: }
232:
233: public boolean getDiscoveryDisabled() {
234: return this .discoveryDisabled;
235: }
236:
237: public String getAutoDiscoveryAddress() {
238: return this .adGroupAddress;
239: }
240:
241: public void setAutoDiscoveryAddress(String adAddress) {
242: this .adGroupAddress = adAddress;
243: }
244:
245: public int getAutoDiscoveryGroup() {
246: return this .adGroupPort;
247: }
248:
249: public void setAutoDiscoveryGroup(int adGroup) {
250: this .adGroupPort = adGroup;
251: }
252:
253: public String getAutoDiscoveryBindAddress() {
254: String address = null;
255: if (discoveryBindAddress != null)
256: address = discoveryBindAddress.getHostAddress();
257: return address;
258: }
259:
260: public void setAutoDiscoveryBindAddress(String address)
261: throws UnknownHostException {
262: discoveryBindAddress = InetAddress.getByName(address);
263: }
264:
265: public int getAutoDiscoveryTTL() {
266: return autoDiscoveryTTL;
267: }
268:
269: public void setAutoDiscoveryTTL(int ttl) {
270: autoDiscoveryTTL = ttl;
271: }
272:
273: public void setJNPServerSocketFactory(String factoryClassName)
274: throws ClassNotFoundException, InstantiationException,
275: IllegalAccessException {
276: this .jnpServerSocketFactoryName = factoryClassName;
277: ClassLoader loader = Thread.currentThread()
278: .getContextClassLoader();
279: Class clazz = loader.loadClass(jnpServerSocketFactoryName);
280: jnpServerSocketFactory = (ServerSocketFactory) clazz
281: .newInstance();
282: }
283:
284: public void setLookupPool(BasicThreadPoolMBean poolMBean) {
285: lookupPool = poolMBean.getInstance();
286: }
287:
288: public void startService(HAPartition haPartition) throws Exception {
289: this .partition = haPartition;
290: this .startService();
291: }
292:
293: protected void createService() throws Exception {
294: boolean debug = log.isDebugEnabled();
295:
296: if (this .clusterPartition == null) {
297: partition = findHAPartitionWithName(partitionName);
298: } else {
299: partition = clusterPartition.getHAPartition();
300: partitionName = partition.getPartitionName();
301: }
302:
303: if (partition == null)
304: throw new IllegalStateException("Cannot find partition '"
305: + partitionName + "'");
306:
307: if (debug)
308: log.debug("Initializing HAJNDI server on partition: "
309: + partitionName);
310:
311: // Start HAJNDI service
312: theServer = new HAJNDI(partition);
313: log.debug("initialize HAJNDI");
314: theServer.init();
315:
316: // Build the Naming interface method map
317: HashMap tmpMap = new HashMap(13);
318: Method[] methods = Naming.class.getMethods();
319: for (int m = 0; m < methods.length; m++) {
320: Method method = methods[m];
321: Long hash = new Long(MarshalledInvocation
322: .calculateHash(method));
323: tmpMap.put(hash, method);
324: }
325: marshalledInvocationMapping = Collections
326: .unmodifiableMap(tmpMap);
327:
328: // share instance for in-vm discovery
329: NamingContext.setHANamingServerForPartition(partitionName,
330: theServer);
331: }
332:
333: protected void startService() throws Exception {
334: log.debug("Obtaining the transport proxy");
335: stub = this .getNamingProxy();
336: this .theServer.setHAStub(stub);
337: if (port >= 0) {
338: log.debug("Starting HAJNDI bootstrap listener");
339: initBootstrapListener();
340: }
341:
342: // Automatic Discovery for unconfigured clients
343: if (adGroupAddress != null && discoveryDisabled == false) {
344: try {
345: autoDiscovery = new AutomaticDiscovery();
346: autoDiscovery.start();
347: lookupPool.run(autoDiscovery);
348: } catch (Exception e) {
349: log.warn("Failed to start AutomaticDiscovery", e);
350: }
351: }
352: }
353:
354: protected void stopService() throws Exception {
355: // un-share instance for in-vm discovery
356: NamingContext.removeHANamingServerForPartition(partitionName);
357:
358: // Stop listener
359: ServerSocket s = bootstrapSocket;
360: bootstrapSocket = null;
361: if (s != null) {
362: log.debug("Closing the bootstrap listener");
363: s.close();
364: }
365:
366: // Stop HAJNDI service
367: log.debug("Stopping the HAJNDI service");
368: theServer.stop();
369:
370: log.debug("Stopping AutomaticDiscovery");
371: if (autoDiscovery != null && discoveryDisabled == false)
372: autoDiscovery.stop();
373: }
374:
375: protected void destroyService() throws Exception {
376: log.debug("Destroying the HAJNDI service");
377: theServer.destroy();
378: }
379:
380: /**
381: * Expose the Naming service via JMX to invokers.
382: * @param invocation A pointer to the invocation object
383: * @return Return value of method invocation.
384: * @throws Exception Failed to invoke method.
385: * @jmx:managed-operation
386: */
387: public Object invoke(Invocation invocation) throws Exception {
388: // Set the method hash to Method mapping
389: if (invocation instanceof MarshalledInvocation) {
390: MarshalledInvocation mi = (MarshalledInvocation) invocation;
391: mi.setMethodMap(marshalledInvocationMapping);
392: }
393: // Invoke the Naming method via reflection
394: Method method = invocation.getMethod();
395: Object[] args = invocation.getArguments();
396: Object value = null;
397: try {
398: value = method.invoke(theServer, args);
399: } catch (InvocationTargetException e) {
400: Throwable t = e.getTargetException();
401: if (t instanceof Exception)
402: throw (Exception) t;
403: else
404: throw new UndeclaredThrowableException(t, method
405: .toString());
406: }
407:
408: return value;
409: }
410:
411: /**
412: * Bring up the bootstrap lookup port for obtaining the naming service proxy
413: */
414: protected void initBootstrapListener() {
415: // Start listener
416: try {
417: // Get the default ServerSocketFactory is one was not specified
418: if (jnpServerSocketFactory == null)
419: jnpServerSocketFactory = ServerSocketFactory
420: .getDefault();
421: bootstrapSocket = jnpServerSocketFactory
422: .createServerSocket(port, backlog, bindAddress);
423: // If an anonymous port was specified get the actual port used
424: if (port == 0)
425: port = bootstrapSocket.getLocalPort();
426: String msg = "Started ha-jndi bootstrap jnpPort=" + port
427: + ", backlog=" + backlog + ", bindAddress="
428: + bindAddress;
429: log.info(msg);
430: } catch (IOException e) {
431: log.error("Could not start on port " + port, e);
432: }
433:
434: if (lookupPool == null)
435: lookupPool = new BasicThreadPool("HANamingBootstrap Pool");
436: AcceptHandler handler = new AcceptHandler();
437: lookupPool.run(handler);
438: }
439:
440: // Protected -----------------------------------------------------
441:
442: protected HAPartition findHAPartitionWithName(String name)
443: throws Exception {
444: HAPartition result = null;
445: QueryExp exp = Query.and(Query.eq(Query.classattr(), Query
446: .value(ClusterPartition.class.getName())), Query.match(
447: Query.attr("PartitionName"), Query.value(name)));
448:
449: Set mbeans = this .getServer().queryMBeans(null, exp);
450: if (mbeans != null && mbeans.size() > 0) {
451: ObjectInstance inst = (ObjectInstance) (mbeans.iterator()
452: .next());
453: ClusterPartitionMBean cp = (ClusterPartitionMBean) MBeanProxyExt
454: .create(ClusterPartitionMBean.class, inst
455: .getObjectName(), this .getServer());
456: result = cp.getHAPartition();
457: }
458:
459: return result;
460: }
461:
462: /**
463: * Get the Naming proxy for the transport. This version looks up the
464: * proxyFactory service Proxy attribute. Subclasses can override this to set
465: * the proxy another way.
466: * @return The Naming proxy for the protocol used with the HAJNDI service
467: */
468: protected Naming getNamingProxy() throws Exception {
469: Naming proxy = (Naming) server.getAttribute(proxyFactory,
470: "Proxy");
471: return proxy;
472: }
473:
474: // Private -------------------------------------------------------
475:
476: private class AutomaticDiscovery implements Runnable {
477: protected Logger log = Logger
478: .getLogger(AutomaticDiscovery.class);
479: /** The socket for auto discovery requests */
480: protected MulticastSocket socket = null;
481: /** The ha-jndi addres + ':' + port string */
482: protected byte[] ipAddress = null;
483: /** The multicast group address */
484: protected InetAddress group = null;
485: protected boolean stopping = false;
486: // Thread that is executing the run() method
487: protected Thread receiverThread = null;
488: protected boolean receiverStopped = true;
489:
490: public AutomaticDiscovery() throws Exception {
491: }
492:
493: public void start() throws Exception {
494: stopping = false;
495: // Use the jndi bind address if there is no discovery address
496: if (discoveryBindAddress == null)
497: discoveryBindAddress = bindAddress;
498: socket = new MulticastSocket(adGroupPort);
499: // If there is a bind address valid, set the socket interface to it
500: if (discoveryBindAddress != null
501: && discoveryBindAddress.isAnyLocalAddress() == false) {
502: socket.setInterface(discoveryBindAddress);
503: }
504: socket.setTimeToLive(autoDiscoveryTTL);
505: group = InetAddress.getByName(adGroupAddress);
506: socket.joinGroup(group);
507:
508: String address = getBindAddress();
509: /* An INADDR_ANY (0.0.0.0 || null) address is useless as the value
510: sent to a remote client so check for this and use the local host
511: address instead.
512: */
513: if (address == null || address.equals("0.0.0.0")) {
514: address = InetAddress.getLocalHost().getHostAddress();
515: }
516: ipAddress = (address + ":" + port).getBytes();
517:
518: log.info("Listening on " + socket.getInterface() + ":"
519: + socket.getLocalPort() + ", group="
520: + adGroupAddress + ", HA-JNDI address="
521: + new String(ipAddress));
522: }
523:
524: public void stop() {
525: try {
526: stopping = true;
527:
528: // JBAS-2834 -- try to stop the receiverThread
529: if (receiverThread != null
530: && receiverThread != Thread.currentThread()
531: && receiverThread.isInterrupted() == false) {
532: // Give it a moment to die on its own (unlikely)
533: receiverThread.join(5);
534: if (!receiverStopped)
535: receiverThread.interrupt(); // kill it
536: }
537:
538: socket.leaveGroup(group);
539: socket.close();
540: } catch (Exception ex) {
541: log.error("Stopping AutomaticDiscovery failed", ex);
542: }
543: }
544:
545: public void run() {
546: boolean trace = log.isTraceEnabled();
547: log.debug("Discovery request thread begin");
548:
549: // JBAS-2834 Cache a reference to this thread so stop()
550: // can interrupt it if necessary
551: receiverThread = Thread.currentThread();
552:
553: receiverStopped = false;
554:
555: // Wait for a datagram
556: while (true) {
557: // Stopped by normal means
558: if (stopping)
559: break;
560: try {
561: if (trace)
562: log
563: .trace("HA-JNDI AutomaticDiscovery waiting for queries...");
564: byte[] buf = new byte[256];
565: DatagramPacket packet = new DatagramPacket(buf,
566: buf.length);
567: socket.receive(packet);
568: if (trace)
569: log
570: .trace("HA-JNDI AutomaticDiscovery Packet received.");
571:
572: // Queue the response to the thread pool
573: DiscoveryRequestHandler handler = new DiscoveryRequestHandler(
574: log, packet, socket, ipAddress);
575: lookupPool.run(handler);
576: if (trace)
577: log.trace("Queued DiscoveryRequestHandler");
578: } catch (Throwable t) {
579: if (stopping == false)
580: log
581: .warn(
582: "Ignored error while processing HAJNDI discovery request:",
583: t);
584: }
585: }
586: receiverStopped = true;
587: log.debug("Discovery request thread end");
588: }
589: }
590:
591: /**
592: * The class used as the runnable for writing the bootstrap stub
593: */
594: private class DiscoveryRequestHandler implements Runnable {
595: private Logger log;
596: private MulticastSocket socket;
597: private DatagramPacket packet;
598: private byte[] ipAddress;
599:
600: DiscoveryRequestHandler(Logger log, DatagramPacket packet,
601: MulticastSocket socket, byte[] ipAddress) {
602: this .log = log;
603: this .packet = packet;
604: this .socket = socket;
605: this .ipAddress = ipAddress;
606: }
607:
608: public void run() {
609: boolean trace = log.isTraceEnabled();
610: if (trace)
611: log.trace("DiscoveryRequestHandler begin");
612: // Return the naming server IP address and port to the client
613: try {
614: // See if the discovery is restricted to a particular parition
615: String requestData = new String(packet.getData())
616: .trim();
617: if (trace)
618: log.trace("RequestData: " + requestData);
619: int colon = requestData.indexOf(':');
620: if (colon > 0) {
621: // Check the partition name
622: String name = requestData.substring(colon + 1);
623: if (name.equals(partitionName) == false) {
624: log
625: .debug("Ignoring discovery request for partition: "
626: + name);
627: if (trace)
628: log.trace("DiscoveryRequestHandler end");
629: return;
630: }
631: }
632: DatagramPacket p = new DatagramPacket(ipAddress,
633: ipAddress.length, packet.getAddress(), packet
634: .getPort());
635: if (trace)
636: log.trace("Sending AutomaticDiscovery answer: "
637: + new String(ipAddress));
638: socket.send(p);
639: if (trace)
640: log.trace("AutomaticDiscovery answer sent.");
641: } catch (IOException ex) {
642: log.error("Error writing response", ex);
643: }
644: if (trace)
645: log.trace("DiscoveryRequestHandler end");
646: }
647: }
648:
649: /**
650: * The class used as the runnable for the bootstrap lookup thread pool.
651: */
652: private class AcceptHandler implements Runnable {
653: public void run() {
654: boolean trace = log.isTraceEnabled();
655: while (bootstrapSocket != null) {
656: Socket socket = null;
657: // Accept a connection
658: try {
659: socket = bootstrapSocket.accept();
660: if (trace)
661: log.trace("Accepted bootstrap client: "
662: + socket);
663: BootstrapRequestHandler handler = new BootstrapRequestHandler(
664: socket);
665: lookupPool.run(handler);
666: } catch (IOException e) {
667: // Stopped by normal means
668: if (bootstrapSocket == null)
669: return;
670: log.error("Naming accept handler stopping", e);
671: } catch (Throwable e) {
672: log.error("Unexpected exception during accept", e);
673: }
674: }
675: }
676: }
677:
678: /**
679: * The class used as the runnable for writing the bootstrap stub
680: */
681: private class BootstrapRequestHandler implements Runnable {
682: private Socket socket;
683:
684: BootstrapRequestHandler(Socket socket) {
685: this .socket = socket;
686: }
687:
688: public void run() {
689: // Return the naming server stub
690: try {
691: OutputStream os = socket.getOutputStream();
692: ObjectOutputStream out = new ObjectOutputStream(os);
693: MarshalledObject replyStub = new MarshalledObject(stub);
694: out.writeObject(replyStub);
695: out.close();
696: } catch (IOException ex) {
697: log.debug("Error writing response to " + socket, ex);
698: } finally {
699: try {
700: socket.close();
701: } catch (IOException e) {
702: }
703: }
704: }
705: }
706: }
|