001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.ha.framework.server;
023:
024: import java.lang.reflect.Method;
025: import java.net.InetAddress;
026: import java.rmi.dgc.VMID;
027: import java.rmi.server.UID;
028: import java.util.Iterator;
029: import java.util.Set;
030: import java.util.Vector;
031:
032: import javax.management.Attribute;
033: import javax.management.AttributeList;
034: import javax.management.InstanceNotFoundException;
035: import javax.management.MBeanServer;
036: import javax.management.MalformedObjectNameException;
037: import javax.management.ObjectName;
038: import javax.management.ReflectionException;
039:
040: import org.jboss.ha.framework.interfaces.HAPartition;
041: import org.jboss.naming.NamingServiceMBean;
042: import org.jboss.system.ServiceMBean;
043: import org.jboss.system.ServiceMBeanSupport;
044: import org.jboss.system.server.ServerConfigUtil;
045: import org.jgroups.Channel;
046: import org.jgroups.JChannel;
047: import org.jgroups.Version;
048: import org.jgroups.debug.Debugger;
049: import org.jgroups.jmx.JChannelFactoryMBean;
050: import org.jgroups.jmx.JmxConfigurator;
051: import org.w3c.dom.Attr;
052: import org.w3c.dom.Element;
053: import org.w3c.dom.NamedNodeMap;
054: import org.w3c.dom.Node;
055: import org.w3c.dom.NodeList;
056:
057: /**
058: * Management Bean for Cluster HAPartitions. It will start a JGroups
059: * channel and initialize the ReplicantManager and DistributedStateService.
060: *
061: * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>.
062: * @author <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
063: * @version $Revision: 62947 $
064: */
065: public class ClusterPartition extends ServiceMBeanSupport implements
066: ClusterPartitionMBean {
067: // Constants -----------------------------------------------------
068:
069: public static final String JGROUPS_JMX_DOMAIN = "jboss.jgroups";
070: public static final String CHANNEL_JMX_ATTRIBUTES = "type=channel,cluster=";
071: public static final String PROTOCOL_JMX_ATTRIBUTES = "type=protocol,cluster=";
072:
073: // Attributes ----------------------------------------------------
074:
075: protected String partitionName = ServerConfigUtil
076: .getDefaultPartitionName();
077: protected String jgProps = "UDP(mcast_addr=228.1.2.3;mcast_port=45566;ip_ttl=64;"
078: + "mcast_send_buf_size=150000;mcast_recv_buf_size=80000):"
079: + "PING(timeout=2000;num_initial_members=3):"
080: + "MERGE2(min_interval=5000;max_interval=10000):"
081: + "FD:"
082: + "VERIFY_SUSPECT(timeout=1500):"
083: + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):"
084: + "UNICAST(timeout=600,1200,2400):"
085: + "pbcast.STABLE(desired_avg_gossip=20000):"
086: + "FRAG(down_thread=false;up_thread=false;frag_size=8192):"
087: + "pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;"
088: + "shun=false;print_local_addr=true):"
089: + "pbcast.STATE_TRANSFER";
090:
091: protected HAPartitionImpl partition;
092: protected boolean deadlock_detection = false;
093: protected boolean allow_sync_events = false;
094: protected JChannelFactoryMBean multiplexer = null;
095: protected String stackName = null;
096: protected org.jgroups.JChannel channel;
097: protected Debugger debugger = null;
098: protected boolean use_debugger = false;
099:
100: protected String nodeName = null;
101: protected InetAddress nodeAddress = null;
102:
103: /** Number of milliseconds to wait until state has been transferred. Increase this value for large states
104: * 0 = wait forever
105: */
106: protected long state_transfer_timeout = 60000;
107:
108: protected long method_call_timeout = 60000;
109:
110: protected boolean channelRegistered;
111: protected boolean protocolsRegistered;
112:
113: // Static --------------------------------------------------------
114:
115: // Constructors --------------------------------------------------
116:
117: // Public --------------------------------------------------------
118:
119: // ClusterPartitionMBean implementation ----------------------------------------------
120:
121: public String getPartitionName() {
122: return partitionName;
123: }
124:
125: public void setPartitionName(String newName) {
126: partitionName = newName;
127: }
128:
129: public String getPartitionProperties() {
130: // The channel knows best
131: if (channel != null)
132: return channel.getProperties();
133:
134: if (multiplexer == null && stackName == null)
135: return jgProps;
136:
137: // We are configured for the multiplexer but don't know
138: // the details of the stack yet
139: return null;
140: }
141:
142: public void setPartitionProperties(String newProps) {
143: jgProps = newProps;
144: }
145:
146: /** Convert a list of elements to the JG property string
147: */
148: public void setPartitionConfig(Element config) {
149: StringBuffer buffer = new StringBuffer();
150: NodeList stack = config.getChildNodes();
151: int length = stack.getLength();
152: for (int s = 0; s < length; s++) {
153: Node node = stack.item(s);
154: if (node.getNodeType() != Node.ELEMENT_NODE)
155: continue;
156:
157: Element tag = (Element) node;
158: String protocol = tag.getTagName();
159: buffer.append(protocol);
160: NamedNodeMap attrs = tag.getAttributes();
161: int attrLength = attrs.getLength();
162: if (attrLength > 0)
163: buffer.append('(');
164: for (int a = 0; a < attrLength; a++) {
165: Attr attr = (Attr) attrs.item(a);
166: String name = attr.getName();
167: String value = attr.getValue();
168: buffer.append(name);
169: buffer.append('=');
170: buffer.append(value);
171: if (a < attrLength - 1)
172: buffer.append(';');
173: }
174: if (attrLength > 0)
175: buffer.append(')');
176: buffer.append(':');
177: }
178: // Remove the trailing ':'
179: buffer.setLength(buffer.length() - 1);
180: this .jgProps = buffer.toString();
181: log.debug("Setting JGProps from xml to: " + jgProps);
182: }
183:
184: /**
185: * Uniquely identifies this node. MUST be unique accros the whole cluster!
186: * Cannot be changed once the partition has been started
187: */
188: public String getNodeName() {
189: return this .nodeName;
190: }
191:
192: public void setNodeName(String node) throws Exception {
193: if (this .getState() == ServiceMBean.CREATED
194: || this .getState() == ServiceMBean.STARTED
195: || this .getState() == ServiceMBean.STARTING) {
196: throw new Exception(
197: "Node name cannot be changed once the partition has been started");
198: } else {
199: this .nodeName = node;
200: }
201: }
202:
203: public InetAddress getNodeAddress() {
204: return nodeAddress;
205: }
206:
207: public void setNodeAddress(InetAddress address) {
208: this .nodeAddress = address;
209: }
210:
211: public String getJGroupsVersion() {
212: return Version.version + "( " + Version.cvs + ")";
213: }
214:
215: public long getStateTransferTimeout() {
216: return state_transfer_timeout;
217: }
218:
219: public void setStateTransferTimeout(long timeout) {
220: this .state_transfer_timeout = timeout;
221: }
222:
223: public long getMethodCallTimeout() {
224: return method_call_timeout;
225: }
226:
227: public void setMethodCallTimeout(long timeout) {
228: this .method_call_timeout = timeout;
229: }
230:
231: public JChannelFactoryMBean getMultiplexer() {
232: return multiplexer;
233: }
234:
235: public void setMultiplexer(JChannelFactoryMBean muxFactory) {
236: this .multiplexer = muxFactory;
237: }
238:
239: public String getMultiplexerStack() {
240: return stackName;
241: }
242:
243: public void setMultiplexerStack(String stackName) {
244: this .stackName = stackName;
245: }
246:
247: // public boolean getChannelDebugger()
248: // {
249: // return this.use_debugger;
250: // }
251: //
252: // public void setChannelDebugger(boolean flag)
253: // {
254: // this.use_debugger=flag;
255: // }
256:
257: public boolean getDeadlockDetection() {
258: return deadlock_detection;
259: }
260:
261: public void setDeadlockDetection(boolean doit) {
262: deadlock_detection = doit;
263: }
264:
265: public boolean getAllowSynchronousMembershipNotifications() {
266: return allow_sync_events;
267: }
268:
269: public void setAllowSynchronousMembershipNotifications(
270: boolean allowSync) {
271: this .allow_sync_events = allowSync;
272: }
273:
274: protected ObjectName getObjectName(MBeanServer server,
275: ObjectName name) throws MalformedObjectNameException {
276: return name == null ? OBJECT_NAME : name;
277: }
278:
279: public HAPartition getHAPartition() {
280: return this .partition;
281: }
282:
283: /** Return the list of member nodes that built from the current view
284: * @return A Vector Strings representing the host:port values of the nodes
285: */
286: public Vector getCurrentView() {
287: return partition.getCurrentView();
288: }
289:
290: // ServiceMBeanSupport overrides ---------------------------------------------------
291:
292: public String getName() {
293: return partitionName;
294: }
295:
296: protected void createService() throws Exception {
297: log.debug("Creating JGroups JChannel");
298:
299: if (stackName != null && multiplexer != null) {
300: this .channel = (JChannel) multiplexer
301: .createMultiplexerChannel(stackName,
302: getPartitionName());
303: } else {
304: this .channel = new org.jgroups.JChannel(jgProps);
305: // JBAS-4406 Hack to register the channel
306: registerChannelInJmx();
307: }
308:
309: if (use_debugger && debugger == null) {
310: debugger = new Debugger(channel);
311: debugger.start();
312: }
313: channel.setOpt(Channel.AUTO_RECONNECT, Boolean.TRUE);
314: channel.setOpt(Channel.AUTO_GETSTATE, Boolean.TRUE);
315:
316: log.debug("Creating HAPartition");
317: partition = createPartition();
318:
319: // JBAS-2769 Init partition in create
320: log.debug("Initing HAPartition: " + partition);
321: partition.init();
322: log.debug("HAPartition initialized");
323:
324: }
325:
326: /**
327: * Extension point meant for test cases; instantiates the HAPartitionImpl.
328: * Test cases can instantiate their own subclass of HAPartitionImpl.
329: */
330: protected HAPartitionImpl createPartition() throws Exception {
331: HAPartitionImpl result = new HAPartitionImpl(partitionName,
332: channel, deadlock_detection, getServer());
333: result.setStateTransferTimeout(this .state_transfer_timeout);
334: result.setMethodCallTimeout(this .method_call_timeout);
335: return result;
336: }
337:
338: protected void startService() throws Exception {
339: // We push the independant name in the protocol stack
340: // before it is connected to the cluster
341: //
342: if (this .nodeName == null || "".equals(this .nodeName))
343: this .nodeName = generateUniqueNodeName();
344:
345: java.util.HashMap staticNodeName = new java.util.HashMap();
346: staticNodeName.put("additional_data", this .nodeName.getBytes());
347:
348: // JBAS-4258 -- invoke via reflection to allow upgrade to JGroups 2.5
349: Class[] paramTypes = new Class[] { org.jgroups.Event.class };
350: Method downMethod = JChannel.class.getDeclaredMethod("down",
351: paramTypes);
352: Object[] params = { new org.jgroups.Event(
353: org.jgroups.Event.CONFIG, staticNodeName) };
354: downMethod.invoke(channel, params);
355:
356: this .channel.getProtocolStack().flushEvents(); // temporary fix for JG bug (808170) TODO: REMOVE ONCE JGROUPS IS FIXED
357:
358: log.debug("Starting ClusterPartition: " + partitionName);
359: channel.connect(partitionName);
360:
361: try {
362: log.debug("Starting channel");
363: partition.startPartition();
364:
365: log.debug("Started ClusterPartition: " + partitionName);
366: } catch (Exception e) {
367: log
368: .debug("Caught exception after channel connected; closing channel -- "
369: + e.getLocalizedMessage());
370: channel.disconnect();
371: throw e;
372: }
373: }
374:
375: protected void stopService() throws Exception {
376: stopChannelDebugger();
377: log.debug("Stopping ClusterPartition: " + partitionName);
378: partition.closePartition();
379: log.debug("Stopped ClusterPartition: " + partitionName);
380: }
381:
382: // NR 200505 : [JBCLUSTER-38] close partition just disconnect from channel
383: // destroy close it.
384: protected void destroyService() throws Exception {
385: log.debug("Destroying ClusterPartition: " + partitionName);
386: partition.destroyPartition();
387: // JBAS-4406 Hack
388: unregisterChannelFromJmx();
389: log.debug("Destroyed ClusterPartition: " + partitionName);
390: }
391:
392: protected String generateUniqueNodeName() throws Exception {
393: // we first try to find a simple meaningful name:
394: // 1st) "local-IP:JNDI_PORT" if JNDI is running on this machine
395: // 2nd) "local-IP:JMV_GUID" otherwise
396: // 3rd) return a fully GUID-based representation
397: //
398:
399: // Before anything we determine the local host IP (and NOT name as this could be
400: // resolved differently by other nodes...)
401:
402: // But use the specified node address for multi-homing
403:
404: String hostIP = null;
405: InetAddress address = ServerConfigUtil
406: .fixRemoteAddress(nodeAddress);
407: if (address == null) {
408: log
409: .debug("unable to create a GUID for this cluster, check network configuration is correctly setup (getLocalHost has returned an exception)");
410: log.debug("using a full GUID strategy");
411: return new VMID().toString();
412: } else {
413: hostIP = address.getHostAddress();
414: }
415:
416: // 1st: is JNDI up and running?
417: //
418: try {
419: AttributeList al = this .server.getAttributes(
420: NamingServiceMBean.OBJECT_NAME, new String[] {
421: "State", "Port" });
422:
423: int status = ((Integer) ((Attribute) al.get(0)).getValue())
424: .intValue();
425: if (status == ServiceMBean.STARTED) {
426: // we can proceed with the JNDI trick!
427: int port = ((Integer) ((Attribute) al.get(1))
428: .getValue()).intValue();
429: return hostIP + ":" + port;
430: } else {
431: log
432: .debug("JNDI has been found but the service wasn't started so we cannot "
433: + "be entirely sure we are the only one that wants to use this PORT "
434: + "as a GUID on this host.");
435: }
436:
437: } catch (InstanceNotFoundException e) {
438: log
439: .debug("JNDI not running here, cannot use this strategy to find a node GUID for the cluster");
440: } catch (ReflectionException e) {
441: log
442: .debug("JNDI querying has returned an exception, cannot use this strategy to find a node GUID for the cluster");
443: }
444:
445: // 2nd: host-GUID strategy
446: //
447: String uid = new UID().toString();
448: return hostIP + ":" + uid;
449: }
450:
451: public String showHistory() {
452: StringBuffer buff = new StringBuffer();
453: Vector data = new Vector(this .partition.history);
454: for (java.util.Iterator row = data.iterator(); row.hasNext();) {
455: String info = (String) row.next();
456: buff.append(info).append("\n");
457: }
458: return buff.toString();
459: }
460:
461: public String showHistoryAsXML() {
462: StringBuffer buff = new StringBuffer();
463: buff.append("<events>\n");
464: Vector data = new Vector(this .partition.history);
465: for (java.util.Iterator row = data.iterator(); row.hasNext();) {
466: buff.append(" <event>\n ");
467: String info = (String) row.next();
468: buff.append(info);
469: buff.append("\n </event>\n");
470: }
471: buff.append("</events>\n");
472: return buff.toString();
473: }
474:
475: public void startChannelDebugger() {
476: startChannelDebugger(false);
477: }
478:
479: public void startChannelDebugger(boolean accumulative) {
480: if (debugger == null) {
481: debugger = new Debugger(this .channel, accumulative);
482: debugger.start();
483: }
484: }
485:
486: public void stopChannelDebugger() {
487: if (debugger != null) {
488: // debugger.stop(); // uncomment when new JGroups version is available
489: debugger = null;
490: }
491: }
492:
493: protected void registerChannelInJmx() {
494: if (server != null) {
495: try {
496: String protocolPrefix = JGROUPS_JMX_DOMAIN + ":"
497: + PROTOCOL_JMX_ATTRIBUTES + getPartitionName();
498: JmxConfigurator.registerProtocols(server, channel,
499: protocolPrefix);
500: protocolsRegistered = true;
501:
502: String name = JGROUPS_JMX_DOMAIN + ":"
503: + CHANNEL_JMX_ATTRIBUTES + getPartitionName();
504: JmxConfigurator.registerChannel(channel, server, name);
505: channelRegistered = true;
506: } catch (Exception e) {
507: log.error(
508: "Caught exception registering channel in JXM",
509: e);
510: }
511: }
512: }
513:
514: protected void unregisterChannelFromJmx() {
515: ObjectName on = null;
516: if (channelRegistered) {
517: // Unregister the channel itself
518: try {
519: on = new ObjectName(JGROUPS_JMX_DOMAIN + ":"
520: + CHANNEL_JMX_ATTRIBUTES + getPartitionName());
521: server.unregisterMBean(on);
522: } catch (Exception e) {
523: if (on != null)
524: log.error(
525: "Caught exception unregistering channel at "
526: + on, e);
527: else
528: log.error("Caught exception unregistering channel",
529: e);
530: }
531: }
532:
533: if (protocolsRegistered) {
534: // Unregister the protocols
535: try {
536: on = new ObjectName(JGROUPS_JMX_DOMAIN + ":*,"
537: + PROTOCOL_JMX_ATTRIBUTES + getPartitionName());
538: Set mbeans = server.queryNames(on, null);
539: if (mbeans != null) {
540: for (Iterator it = mbeans.iterator(); it.hasNext();) {
541: server.unregisterMBean((ObjectName) it.next());
542: }
543: }
544: } catch (Exception e) {
545: if (on != null)
546: log.error(
547: "Caught exception unregistering protocols at "
548: + on, e);
549: else
550: log.error(
551: "Caught exception unregistering protocols",
552: e);
553: }
554: }
555: }
556: }
|