001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.cache.invalidation.bridges;
023:
024: import java.io.Serializable;
025: import java.util.ArrayList;
026: import java.util.Vector;
027: import java.util.Collection;
028:
029: import org.jboss.cache.invalidation.BatchInvalidation;
030: import org.jboss.cache.invalidation.InvalidationManager;
031: import org.jboss.cache.invalidation.InvalidationGroup;
032: import org.jboss.cache.invalidation.InvalidationManagerMBean;
033: import org.jboss.cache.invalidation.BridgeInvalidationSubscription;
034: import org.jboss.cache.invalidation.InvalidationBridgeListener;
035: import org.jboss.ha.framework.interfaces.HAPartition;
036: import org.jboss.ha.framework.interfaces.DistributedState;
037: import org.jboss.ha.framework.interfaces.DistributedReplicantManager;
038: import org.jboss.ha.framework.server.ClusterPartitionMBean;
039: import org.jboss.system.server.ServerConfigUtil;
040:
041: /**
042: * JGroups implementation of a cache invalidation bridge
043: *
044: * @see JGCacheInvalidationBridgeMBean
045: *
046: * @author <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
047: * @version $Revision: 58255 $
048: *
049: * <p><b>Revisions:</b>
050: *
051: * <p><b>24 septembre 2002 Sacha Labourey:</b>
052: * <ul>
053: * <li> First implementation </li>
054: * </ul>
055: */
056:
057: public class JGCacheInvalidationBridge extends
058: org.jboss.system.ServiceMBeanSupport implements
059: JGCacheInvalidationBridgeMBean, DistributedState.DSListenerEx,
060: InvalidationBridgeListener,
061: DistributedReplicantManager.ReplicantListener {
062:
063: // Constants -----------------------------------------------------
064:
065: // Attributes ----------------------------------------------------
066:
067: protected String partitionName = ServerConfigUtil
068: .getDefaultPartitionName();
069: /**
070: * The ClusterPartition with which we are associated.
071: */
072: protected ClusterPartitionMBean clusterPartition;
073: protected String invalidationManagerName = InvalidationManager.DEFAULT_JMX_SERVICE_NAME;
074: protected String bridgeName = "DefaultJGCacheIB";
075:
076: protected HAPartition partition = null;
077: protected DistributedState ds = null;
078: protected DistributedReplicantManager drm = null;
079: protected String RPC_HANLE_NAME = null;
080: protected String nodeName = null;
081:
082: protected InvalidationManagerMBean invalMgr = null;
083: protected BridgeInvalidationSubscription invalidationSubscription = null;
084: protected Collection localGroups = null;
085: protected Vector bridgedGroups = new Vector();
086:
087: protected final Class[] rpc_invalidate_types = new Class[] {
088: String.class, Serializable.class };
089: protected final Class[] rpc_invalidates_types = new Class[] {
090: String.class, Serializable[].class };
091: protected final Class[] rpc_invalidate_all_types = new Class[] { String.class };
092: protected final Class[] rpc_batch_invalidate_types = new Class[] { BatchInvalidation[].class };
093:
094: // Static --------------------------------------------------------
095:
096: // Constructors --------------------------------------------------
097:
098: public JGCacheInvalidationBridge() {
099: }
100:
101: // Public --------------------------------------------------------
102:
103: // JGCacheInvalidationBridgeMBean implementation ----------------------------------------------
104:
105: public String getInvalidationManager() {
106: return this .invalidationManagerName;
107: }
108:
109: public ClusterPartitionMBean getClusterPartition() {
110: return clusterPartition;
111: }
112:
113: public void setClusterPartition(
114: ClusterPartitionMBean clusterPartition) {
115: this .clusterPartition = clusterPartition;
116: }
117:
118: public String getPartitionName() {
119: return this .partitionName;
120: }
121:
122: public void setInvalidationManager(String objectName) {
123: this .invalidationManagerName = objectName;
124: }
125:
126: public void setPartitionName(String partitionName) {
127: this .partitionName = partitionName;
128: }
129:
130: public String getBridgeName() {
131: return this .bridgeName;
132: }
133:
134: public void setBridgeName(String name) {
135: this .bridgeName = name;
136: }
137:
138: // DistributedReplicantManager.ReplicantListener implementation ---------------------------
139:
140: /**
141: * @todo examine thread safety. synchronized keyword was added to method
142: * signature when internal behavior of DistributedReplicantManagerImpl was
143: * changed so that multiple threads could concurrently send replicantsChanged
144: * notifications. Need to examine in detail how this method interacts with
145: * DistributedState to see if we can remove/narrow the synchronization.
146: */
147: public synchronized void replicantsChanged(String key,
148: java.util.List newReplicants, int newReplicantsViewId) {
149: if (key.equals(this .RPC_HANLE_NAME)
150: && this .drm.isMasterReplica(this .RPC_HANLE_NAME)) {
151: log
152: .debug("The list of replicant for the JG bridge has changed, computing and updating local info...");
153:
154: // we remove any entry from the DS whose node is dead
155: //
156: java.util.Collection coll = this .ds
157: .getAllKeys(this .RPC_HANLE_NAME);
158: if (coll == null) {
159: log
160: .debug("... No bridge info was associated to this node");
161: return;
162: }
163:
164: // to avoid ConcurrentModificationException, we copy the list of keys in a new structure
165: //
166: ArrayList collCopy = new java.util.ArrayList(coll);
167: java.util.List newReplicantsNodeNames = this .drm
168: .lookupReplicantsNodeNames(this .RPC_HANLE_NAME);
169:
170: for (int i = 0; i < collCopy.size(); i++) {
171: String nodeEntry = (String) collCopy.get(i);
172: if (!newReplicantsNodeNames.contains(nodeEntry)) {
173: // the list of bridged topic contains a dead member: we remove it
174: //
175: try {
176: log
177: .debug("removing bridge information associated to this node from the DS");
178: this .ds.remove(this .RPC_HANLE_NAME, nodeEntry,
179: true);
180: } catch (Exception e) {
181: log
182: .info(
183: "Unable to remove a node entry from the distributed cache",
184: e);
185: }
186: }
187: }
188: }
189: }
190:
191: // DistributedState.DSListener implementation ----------------------------------------------
192:
193: public void valueHasChanged(String category, Serializable key,
194: Serializable value, boolean locallyModified) {
195: this .updatedBridgedInvalidationGroupsInfo();
196: }
197:
198: public void keyHasBeenRemoved(String category, Serializable key,
199: Serializable previousContent, boolean locallyModified) {
200: this .updatedBridgedInvalidationGroupsInfo();
201: }
202:
203: // InvalidationBridgeListener implementation ----------------------------------------------
204:
205: public void batchInvalidate(BatchInvalidation[] invalidations,
206: boolean asynchronous) {
207: if (invalidations == null)
208: return;
209:
210: // we need to sort which group other nodes accept or refuse and propagate through the net
211: //
212: ArrayList acceptedGroups = new ArrayList();
213:
214: for (int i = 0; i < invalidations.length; i++) {
215: BatchInvalidation currBI = invalidations[i];
216: if (groupExistsRemotely(currBI.getInvalidationGroupName()))
217: acceptedGroups.add(currBI);
218: }
219:
220: if (acceptedGroups.size() > 0) {
221: BatchInvalidation[] result = new BatchInvalidation[acceptedGroups
222: .size()];
223: result = (BatchInvalidation[]) acceptedGroups
224: .toArray(result);
225:
226: if (log.isTraceEnabled())
227: log.trace("Transmitting batch invalidation: " + result);
228: this ._do_rpc_batchInvalidate(result, asynchronous);
229: }
230: }
231:
232: public void invalidate(String invalidationGroupName,
233: Serializable[] keys, boolean asynchronous) {
234: // if the group exists on another node, we simply propagate to other nodes
235: //
236: if (log.isTraceEnabled())
237: log.trace("Transmitting invalidations for group: "
238: + invalidationGroupName);
239:
240: if (groupExistsRemotely(invalidationGroupName))
241: _do_rpc_invalidates(invalidationGroupName, keys,
242: asynchronous);
243: }
244:
245: public void invalidate(String invalidationGroupName,
246: Serializable key, boolean asynchronous) {
247: // if the group exists on another node, we simply propagate to other nodes
248: //
249: if (log.isTraceEnabled())
250: log.trace("Transmitting invalidation for group: "
251: + invalidationGroupName);
252:
253: if (groupExistsRemotely(invalidationGroupName))
254: _do_rpc_invalidate(invalidationGroupName, key, asynchronous);
255: }
256:
257: public void invalidateAll(String groupName, boolean async) {
258: if (log.isTraceEnabled())
259: log
260: .trace("Transmitting for all entries for invalidation for group: "
261: + groupName);
262: if (groupExistsRemotely(groupName))
263: _do_rpc_invalidate_all(groupName, async);
264: }
265:
266: public void newGroupCreated(String groupInvalidationName) {
267: try {
268: this .publishLocalInvalidationGroups();
269: //this.updatedBridgedInvalidationGroupsInfo ();
270: } catch (Exception e) {
271: log
272: .info(
273: "Problem while registering a new invalidation group over the cluster",
274: e);
275: }
276: }
277:
278: public void groupIsDropped(String groupInvalidationName) {
279: try {
280: this .publishLocalInvalidationGroups();
281: //this.updatedBridgedInvalidationGroupsInfo ();
282: } catch (Exception e) {
283: log
284: .info(
285: "Problem while un-registering a new invalidation group over the cluster",
286: e);
287: }
288: }
289:
290: // ServiceMBeanSupport overrides ---------------------------------------------------
291:
292: public void startService() throws Exception {
293: RPC_HANLE_NAME = "DCacheBridge-" + this .bridgeName;
294:
295: // Support old-style partition lookup for configs that don't
296: // inject the partition.
297: // TODO remove this after a while; deprecated in 4.0.4
298: if (this .clusterPartition == null) {
299: javax.naming.Context ctx = new javax.naming.InitialContext();
300: this .partition = (HAPartition) ctx.lookup("/HAPartition/"
301: + this .partitionName);
302: } else {
303: this .partition = this .clusterPartition.getHAPartition();
304: this .partitionName = this .partition.getPartitionName();
305: }
306:
307: this .ds = this .partition.getDistributedStateService();
308: this .drm = this .partition.getDistributedReplicantManager();
309: this .nodeName = this .partition.getNodeName();
310:
311: this .drm.add(this .RPC_HANLE_NAME, "");
312: this .drm.registerListener(this .RPC_HANLE_NAME, this );
313: this .ds.registerDSListenerEx(RPC_HANLE_NAME, this );
314: this .partition.registerRPCHandler(RPC_HANLE_NAME, this );
315:
316: // we now publish the list of caches we have access to
317: //
318: this .invalMgr = (org.jboss.cache.invalidation.InvalidationManagerMBean) org.jboss.system.Registry
319: .lookup(this .invalidationManagerName);
320:
321: publishLocalInvalidationGroups();
322: this .updatedBridgedInvalidationGroupsInfo();
323:
324: this .invalidationSubscription = invalMgr
325: .registerBridgeListener(this );
326:
327: }
328:
329: public void stopService() {
330: try {
331: this .partition.unregisterRPCHandler(this .RPC_HANLE_NAME,
332: this );
333: this .ds.unregisterDSListenerEx(this .RPC_HANLE_NAME, this );
334: this .drm.unregisterListener(this .RPC_HANLE_NAME, this );
335: this .drm.remove(this .RPC_HANLE_NAME);
336:
337: this .invalidationSubscription.unregister();
338:
339: this .ds.remove(this .RPC_HANLE_NAME, this .nodeName, true);
340:
341: this .invalMgr = null;
342: this .partition = null;
343: this .drm = null;
344: this .ds = null;
345: this .invalidationSubscription = null;
346: this .RPC_HANLE_NAME = null;
347: this .nodeName = null;
348: this .localGroups = null;
349: this .bridgedGroups = new Vector();
350: } catch (Exception e) {
351: log
352: .info(
353: "Problem while shuting down invalidation cache bridge",
354: e);
355: }
356: }
357:
358: // RPC calls ---------------------------------------------
359:
360: public void _rpc_invalidate(String invalidationGroupName,
361: Serializable key) {
362: if (log.isTraceEnabled())
363: log.trace("Received remote invalidation for group: "
364: + invalidationGroupName);
365:
366: this .invalidationSubscription.invalidate(invalidationGroupName,
367: key);
368: }
369:
370: public void _rpc_invalidates(String invalidationGroupName,
371: Serializable[] keys) {
372: if (log.isTraceEnabled())
373: log.trace("Received remote invalidations for group: "
374: + invalidationGroupName);
375:
376: this .invalidationSubscription.invalidate(invalidationGroupName,
377: keys);
378: }
379:
380: public void _rpc_invalidate_all(String invalidationGroupName) {
381: if (log.isTraceEnabled())
382: log.trace("Received remote invalidate_all for group: "
383: + invalidationGroupName);
384:
385: this .invalidationSubscription
386: .invalidateAll(invalidationGroupName);
387: }
388:
389: public void _rpc_batchInvalidate(BatchInvalidation[] invalidations) {
390: if (log.isTraceEnabled() && invalidations != null)
391: log
392: .trace("Received remote batch invalidation for this number of groups: "
393: + invalidations.length);
394:
395: this .invalidationSubscription.batchInvalidate(invalidations);
396: }
397:
398: protected void _do_rpc_invalidate(String invalidationGroupName,
399: Serializable key, boolean asynch) {
400: Object[] params = new Object[] { invalidationGroupName, key };
401: try {
402: if (asynch)
403: this .partition.callAsynchMethodOnCluster(
404: this .RPC_HANLE_NAME, "_rpc_invalidate", params,
405: rpc_invalidate_types, true);
406: else
407: this .partition.callMethodOnCluster(this .RPC_HANLE_NAME,
408: "_rpc_invalidate", params,
409: rpc_invalidate_types, true);
410: } catch (Exception e) {
411: log
412: .debug("Distributed invalidation (1) has failed for group "
413: + invalidationGroupName
414: + " (Bridge: "
415: + this .bridgeName + ")");
416: }
417: }
418:
419: protected void _do_rpc_invalidates(String invalidationGroupName,
420: Serializable[] keys, boolean asynch) {
421: Object[] params = new Object[] { invalidationGroupName, keys };
422: try {
423: if (asynch)
424: this .partition.callAsynchMethodOnCluster(
425: this .RPC_HANLE_NAME, "_rpc_invalidates",
426: params, rpc_invalidates_types, true);
427: else
428: this .partition.callMethodOnCluster(this .RPC_HANLE_NAME,
429: "_rpc_invalidates", params,
430: rpc_invalidates_types, true);
431: } catch (Exception e) {
432: log
433: .debug("Distributed invalidation (2) has failed for group "
434: + invalidationGroupName
435: + " (Bridge: "
436: + this .bridgeName + ")");
437: }
438: }
439:
440: protected void _do_rpc_invalidate_all(String invalidationGroupName,
441: boolean asynch) {
442: Object[] params = new Object[] { invalidationGroupName };
443: try {
444: if (asynch)
445: this .partition.callAsynchMethodOnCluster(
446: this .RPC_HANLE_NAME, "_rpc_invalidate_all",
447: params, rpc_invalidate_all_types, true);
448: else
449: this .partition.callMethodOnCluster(this .RPC_HANLE_NAME,
450: "_rpc_invalidate_all", params,
451: rpc_invalidate_all_types, true);
452: } catch (Exception e) {
453: log
454: .debug("Distributed invalidation (2) has failed for group "
455: + invalidationGroupName
456: + " (Bridge: "
457: + this .bridgeName + ")");
458: }
459: }
460:
461: protected void _do_rpc_batchInvalidate(
462: BatchInvalidation[] invalidations, boolean asynch) {
463: Object[] params = new Object[] { invalidations };
464: try {
465: if (asynch)
466: this .partition.callAsynchMethodOnCluster(
467: this .RPC_HANLE_NAME, "_rpc_batchInvalidate",
468: params, rpc_batch_invalidate_types, true);
469: else
470: this .partition.callMethodOnCluster(this .RPC_HANLE_NAME,
471: "_rpc_batchInvalidate", params,
472: rpc_batch_invalidate_types, true);
473: } catch (Exception e) {
474: log
475: .debug("Distributed invalidation (3) has failed (Bridge: "
476: + this .bridgeName + ")");
477: }
478: }
479:
480: // Package protected ---------------------------------------------
481:
482: // Protected -----------------------------------------------------
483:
484: protected synchronized void publishLocalInvalidationGroups()
485: throws Exception {
486: this .localGroups = invalMgr.getInvalidationGroups();
487:
488: log.debug("Publishing locally available invalidation groups: "
489: + this .localGroups);
490:
491: ArrayList content = new ArrayList(this .localGroups);
492: ArrayList result = new ArrayList(content.size());
493:
494: for (int i = 0; i < content.size(); i++) {
495: String aGroup = ((InvalidationGroup) content.get(i))
496: .getGroupName();
497: result.add(aGroup);
498: }
499:
500: if (result.size() > 0) {
501: NodeInfo info = new NodeInfo(result, this .nodeName);
502: this .ds.set(this .RPC_HANLE_NAME, this .nodeName, info, true);
503: } else
504: this .ds.remove(this .RPC_HANLE_NAME, this .nodeName, true);
505: }
506:
507: protected void updatedBridgedInvalidationGroupsInfo() {
508: Collection bridgedByNode = this .ds
509: .getAllValues(this .RPC_HANLE_NAME);
510:
511: log
512: .debug("Updating list of invalidation groups that are bridged...");
513:
514: if (bridgedByNode != null) {
515: // Make a copy
516: //
517: ArrayList copy = new ArrayList(bridgedByNode);
518:
519: Vector result = new Vector();
520:
521: for (int i = 0; i < copy.size(); i++) {
522: NodeInfo infoForNode = (NodeInfo) copy.get(i);
523: log.trace("InfoForNode: " + infoForNode);
524:
525: if (infoForNode != null
526: && !infoForNode.groupName.equals(this .nodeName)) {
527: ArrayList groupsForNode = infoForNode.groups;
528: log.trace("Groups for node: " + groupsForNode);
529:
530: for (int j = 0; j < groupsForNode.size(); j++) {
531: String aGroup = (String) groupsForNode.get(j);
532: if (!result.contains(aGroup)) {
533: log.trace("Adding: " + aGroup);
534: result.add(aGroup);
535: }
536: }
537:
538: }
539:
540: }
541: // atomic assignation of the result
542: //
543: this .bridgedGroups = result;
544:
545: log.debug("... computed list of bridged groups: " + result);
546: } else {
547: log.debug("... nothing needs to be bridged.");
548: }
549:
550: }
551:
552: protected boolean groupExistsRemotely(String groupName) {
553: return this .bridgedGroups.contains(groupName);
554: }
555:
556: // Private -------------------------------------------------------
557:
558: // Inner classes -------------------------------------------------
559:
560: }
561:
562: class NodeInfo implements java.io.Serializable {
563: static final long serialVersionUID = -3215712955134929006L;
564:
565: public ArrayList groups = null;
566: public String groupName = null;
567:
568: public NodeInfo() {
569: }
570:
571: public NodeInfo(ArrayList groups, String groupName) {
572: this.groups = groups;
573: this.groupName = groupName;
574: }
575:
576: }
|