001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.ha.jmx;
023:
024: import java.io.Serializable;
025: import java.util.List;
026: import java.util.Set;
027:
028: import javax.management.AttributeChangeNotification;
029: import javax.management.InstanceNotFoundException;
030: import javax.management.Notification;
031: import javax.management.ObjectInstance;
032: import javax.management.Query;
033: import javax.management.QueryExp;
034:
035: import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
036: import org.jboss.ha.framework.interfaces.DistributedState;
037: import org.jboss.ha.framework.interfaces.HAPartition;
038: import org.jboss.ha.framework.server.ClusterPartition;
039: import org.jboss.ha.framework.server.ClusterPartitionMBean;
040: import org.jboss.mx.util.MBeanProxyExt;
041: import org.jboss.system.ServiceMBeanSupport;
042: import org.jboss.system.server.ServerConfigUtil;
043:
044: /**
045: *
046: * Management Bean for an HA-Service.
047: * Provides a convenient common base for cluster symmetric MBeans.
048: *
049: * This class is also a user transparent extension
050: * of the standard NotificationBroadcasterSupport
051: * to a clustered environment.
052: * Listeners register with their local broadcaster.
053: * Invoking sendNotification() on any broadcaster,
054: * will notify all listeners in the same cluster partition.
055: * TODO: The performance can be further optimized by avoiding broadcast messages
056: * when remote listener nodes are not interested (e.g. have no local subscribers)
057: * or by iterating locally over filters or remote listeners.
058: *
059: * @author Ivelin Ivanov <ivelin@apache.org> *
060: * @version $Revision: 57188 $
061: *
062: */
063: public class HAServiceMBeanSupport extends ServiceMBeanSupport
064: implements HAServiceMBean {
065: // Constants -----------------------------------------------------
066:
067: // Attributes ----------------------------------------------------
068: private HAPartition partition_;
069: private ClusterPartitionMBean clusterPartition;
070: private String partitionName = ServerConfigUtil
071: .getDefaultPartitionName();
072:
073: private DistributedReplicantManager.ReplicantListener drmListener = null;
074:
075: /**
076: * DRM participation TOKEN
077: */
078: private String REPLICANT_TOKEN = "";
079:
080: private boolean sendLocalLifecycleNotifications = true;
081: private boolean sendRemoteLifecycleNotifications = true;
082:
083: // Public --------------------------------------------------------
084:
085: public HAServiceMBeanSupport() {
086: // for JMX
087: }
088:
089: public ClusterPartitionMBean getClusterPartition() {
090: return clusterPartition;
091: }
092:
093: public void setClusterPartition(
094: ClusterPartitionMBean clusterPartition) {
095: if ((getState() != STARTED) && (getState() != STARTING)) {
096: this .clusterPartition = clusterPartition;
097: }
098: }
099:
100: public String getPartitionName() {
101: return partitionName;
102: }
103:
104: public void setPartitionName(String newPartitionName) {
105: if ((getState() != STARTED) && (getState() != STARTING)) {
106: partitionName = newPartitionName;
107: }
108: }
109:
110: // Protected ------------------------------
111:
112: /**
113: *
114: *
115: * Convenience method for sharing state across a cluster partition.
116: * Delegates to the DistributedStateService
117: *
118: * @param key key for the distributed object
119: * @param value the distributed object
120: *
121: */
122: public void setDistributedState(String key, Serializable value)
123: throws Exception {
124: DistributedState ds = getPartition()
125: .getDistributedStateService();
126: ds.set(getServiceHAName(), key, value);
127: }
128:
129: /**
130: *
131: * Convenience method for sharing state across a cluster partition.
132: * Delegates to the DistributedStateService
133: *
134: * @param key key for the distributed object
135: * @return Serializable the distributed object
136: *
137: */
138: public Serializable getDistributedState(String key) {
139: DistributedState ds = getPartition()
140: .getDistributedStateService();
141: return ds.get(getServiceHAName(), key);
142: }
143:
144: /**
145: * <p>
146: * Implementors of this method should not
147: * code the singleton logic here.
148: * The MBean lifecycle create/start/stop are separate from
149: * the singleton logic.
150: * Singleton logic should originate in becomeMaster().
151: * </p>
152: *
153: * <p>
154: * <b>Attention</b>: Always call this method when you overwrite it in a subclass
155: * because it elects the master singleton node.
156: * </p>
157: *
158: */
159: protected void startService() throws Exception {
160: boolean debug = log.isDebugEnabled();
161: if (debug)
162: log.debug("start HAServiceMBeanSupport");
163:
164: setupPartition();
165:
166: registerRPCHandler();
167:
168: registerDRMListener();
169: }
170:
171: /**
172: * <p>
173: * <b>Attention</b>: Always call this method when you overwrite it in a subclass
174: * </p>
175: *
176: */
177: protected void stopService() throws Exception {
178: boolean debug = log.isDebugEnabled();
179: if (debug)
180: log.debug("stop HAServiceMBeanSupport");
181:
182: unregisterDRMListener();
183:
184: unregisterRPCHandler();
185: }
186:
187: protected void setupPartition() throws Exception {
188: // get handle to the cluster partition
189: if (clusterPartition == null) {
190: String pName = getPartitionName();
191: partition_ = findHAPartitionWithName(pName);
192: } else {
193: partition_ = clusterPartition.getHAPartition();
194: partitionName = partition_.getPartitionName();
195: }
196: }
197:
198: protected void registerRPCHandler() {
199: partition_.registerRPCHandler(getServiceHAName(), this );
200: }
201:
202: protected void unregisterRPCHandler() {
203: partition_.unregisterRPCHandler(getServiceHAName(), this );
204: }
205:
206: protected void registerDRMListener() throws Exception {
207: DistributedReplicantManager drm = this .partition_
208: .getDistributedReplicantManager();
209:
210: // register to listen to topology changes, which might cause the election of a new master
211: drmListener = new DistributedReplicantManager.ReplicantListener() {
212: Object mutex = new Object();
213:
214: public void replicantsChanged(String key,
215: List newReplicants, int newReplicantsViewId) {
216: if (key.equals(getServiceHAName())) {
217: // This synchronized block was added when the internal behavior of
218: // DistributedReplicantManagerImpl was changed so that concurrent
219: // replicantsChanged notifications are possible. Synchronization
220: // ensures that this change won't break non-thread-safe
221: // subclasses of HAServiceMBeanSupport.
222: synchronized (mutex) {
223: // change in the topology callback
224: HAServiceMBeanSupport.this
225: .partitionTopologyChanged(
226: newReplicants,
227: newReplicantsViewId);
228: }
229: }
230: }
231: };
232: drm.registerListener(getServiceHAName(), drmListener);
233:
234: // this ensures that the DRM knows that this node has the MBean deployed
235: drm.add(getServiceHAName(), REPLICANT_TOKEN);
236: }
237:
238: protected void unregisterDRMListener() throws Exception {
239: DistributedReplicantManager drm = this .partition_
240: .getDistributedReplicantManager();
241:
242: // remove replicant node
243: drm.remove(getServiceHAName());
244:
245: // unregister
246: drm.unregisterListener(getServiceHAName(), drmListener);
247: }
248:
249: public void partitionTopologyChanged(List newReplicants,
250: int newReplicantsViewId) {
251: boolean debug = log.isDebugEnabled();
252:
253: if (debug) {
254: log.debug("partitionTopologyChanged(). cluster view id: "
255: + newReplicantsViewId);
256: }
257: }
258:
259: protected boolean isDRMMasterReplica() {
260: DistributedReplicantManager drm = getPartition()
261: .getDistributedReplicantManager();
262:
263: return drm.isMasterReplica(getServiceHAName());
264: }
265:
266: public HAPartition getPartition() {
267: return partition_;
268: }
269:
270: /**
271: *
272: * @param methodName
273: * @param args
274: * @throws Exception
275: * @deprecated Use {@link #callMethodOnPartition(String, Object[], Class[])} instead
276: */
277: public void callMethodOnPartition(String methodName, Object[] args)
278: throws Exception {
279: getPartition().callMethodOnCluster(getServiceHAName(),
280: methodName, args, true);
281: }
282:
283: public void callMethodOnPartition(String methodName, Object[] args,
284: Class[] types) throws Exception {
285: getPartition().callMethodOnCluster(getServiceHAName(),
286: methodName, args, types, true);
287: }
288:
289: /**
290: * Gets whether JMX Notifications should be sent to local (same JVM) listeners
291: * if the notification is for an attribute change to attribute "State".
292: * <p>
293: * Default is <code>true</code>.
294: * </p>
295: * @see #sendNotification(Notification)
296: */
297: public boolean getSendLocalLifecycleNotifications() {
298: return sendLocalLifecycleNotifications;
299: }
300:
301: /**
302: * Sets whether JMX Notifications should be sent to local (same JVM) listeners
303: * if the notification is for an attribute change to attribute "State".
304: * <p>
305: * Default is <code>true</code>.
306: * </p>
307: * @see #sendNotification(Notification)
308: */
309: public void setSendLocalLifecycleNotifications(
310: boolean sendLocalLifecycleNotifications) {
311: this .sendLocalLifecycleNotifications = sendLocalLifecycleNotifications;
312: }
313:
314: /**
315: * Gets whether JMX Notifications should be sent to remote listeners
316: * if the notification is for an attribute change to attribute "State".
317: * <p>
318: * Default is <code>true</code>.
319: * </p>
320: * <p>
321: * See http://jira.jboss.com/jira/browse/JBAS-3194 for an example of a
322: * use case where this property should be set to false.
323: * </p>
324: *
325: * @see #sendNotification(Notification)
326: */
327: public boolean getSendRemoteLifecycleNotifications() {
328: return sendRemoteLifecycleNotifications;
329: }
330:
331: /**
332: * Sets whether JMX Notifications should be sent to remote listeners
333: * if the notification is for an attribute change to attribute "State".
334: * <p>
335: * Default is <code>true</code>.
336: * </p>
337: * <p>
338: * See http://jira.jboss.com/jira/browse/JBAS-3194 for an example of a
339: * use case where this property should be set to false.
340: * </p>
341: *
342: * @see #sendNotification(Notification)
343: */
344: public void setSendRemoteLifecycleNotifications(
345: boolean sendRemoteLifecycleNotifications) {
346: this .sendRemoteLifecycleNotifications = sendRemoteLifecycleNotifications;
347: }
348:
349: /**
350: * Broadcast the notification to the remote listener nodes (if any) and then
351: * invoke super.sendNotification() to notify local listeners.
352: *
353: * @param notification sent out to local listeners and other nodes. It should be serializable.
354: * It is recommended that the source of the notification is an ObjectName of an MBean that
355: * is is available on all nodes where the broadcaster MBean is registered.
356: *
357: * @see #getSendLocalLifecycleNotifications()
358: * @see #getSendRemoteLifecycleNotifications()
359: * @see javax.management.NotificationBroadcasterSupport#sendNotification(Notification)
360: */
361: public void sendNotification(Notification notification) {
362: boolean stateChange = isStateChangeNotification(notification);
363:
364: if (!stateChange || sendRemoteLifecycleNotifications) {
365: try {
366: // Overriding the source MBean with its ObjectName
367: // to ensure that it can be safely transferred over the wire
368: notification.setSource(this .getServiceName());
369: sendNotificationRemote(notification);
370: }
371:
372: catch (Throwable th) {
373: boolean debug = log.isDebugEnabled();
374: if (debug)
375: log.debug("sendNotificationRemote( " + notification
376: + " ) failed ", th);
377: // even if broadcast failed, local notification should still be sent
378:
379: }
380: }
381:
382: if (!stateChange || sendLocalLifecycleNotifications) {
383: sendNotificationToLocalListeners(notification);
384: }
385: }
386:
387: private boolean isStateChangeNotification(Notification notification) {
388: boolean stateChange = false;
389: if (notification instanceof AttributeChangeNotification) {
390: stateChange = "State"
391: .equals(((AttributeChangeNotification) notification)
392: .getAttributeName());
393: }
394: return stateChange;
395: }
396:
397: protected void sendNotificationToLocalListeners(
398: Notification notification) {
399: super .sendNotification(notification);
400: }
401:
402: protected void callAsyncMethodOnPartition(String methodName,
403: Object[] args, Class[] types) throws Exception {
404: HAPartition partition = getPartition();
405: if (partition != null) {
406: getPartition().callAsynchMethodOnCluster(
407: getServiceHAName(), methodName, args, types, true);
408: }
409: }
410:
411: /**
412: *
413: * Broadcast a notifcation remotely to the partition participants
414: *
415: * @param notification
416: */
417: protected void sendNotificationRemote(Notification notification)
418: throws Exception {
419: callAsyncMethodOnPartition("_receiveRemoteNotification",
420: new Object[] { notification },
421: new Class[] { Notification.class });
422: }
423:
424: /**
425: *
426: * Invoked by remote broadcasters.
427: * Delegates to the super class
428: *
429: */
430: public void _receiveRemoteNotification(Notification notification) {
431: super .sendNotification(notification);
432: }
433:
434: /**
435: *
436: * Override this method only if you need to provide a custom partition wide unique service name.
437: * The default implementation will usually work, provided that
438: * the getServiceName() method returns a unique canonical MBean name.
439: *
440: * @return partition wide unique service name
441: */
442: public String getServiceHAName() {
443: return getServiceName().getCanonicalName();
444: }
445:
446: protected HAPartition findHAPartitionWithName(String name)
447: throws Exception {
448: log.debug("findHAPartitionWithName, name=" + name);
449: HAPartition result = null;
450: QueryExp classEQ = Query.eq(Query.classattr(), Query
451: .value(ClusterPartition.class.getName()));
452: QueryExp matchPartitionName = Query.match(Query
453: .attr("PartitionName"), Query.value(name));
454: QueryExp exp = Query.and(classEQ, matchPartitionName);
455: Set mbeans = this .getServer().queryMBeans(null, exp);
456: if (mbeans != null && mbeans.size() > 0) {
457: ObjectInstance inst = (ObjectInstance) (mbeans.iterator()
458: .next());
459: ClusterPartitionMBean cp = (ClusterPartitionMBean) MBeanProxyExt
460: .create(ClusterPartitionMBean.class, inst
461: .getObjectName(), this .getServer());
462: result = cp.getHAPartition();
463: }
464:
465: if (result == null) {
466: String msg = "Failed to find HAPartition with PartitionName="
467: + name;
468: throw new InstanceNotFoundException(msg);
469: }
470: return result;
471: }
472:
473: // Private -------------------------------------------------------
474:
475: // Inner classes -------------------------------------------------
476:
477: }
|