001: /*
002: * Copyright (c) 2002-2003 by OpenSymphony
003: * All rights reserved.
004: */
005: package com.opensymphony.oscache.plugins.clustersupport;
006:
007: import com.opensymphony.oscache.base.Cache;
008: import com.opensymphony.oscache.base.Config;
009: import com.opensymphony.oscache.base.FinalizationException;
010: import com.opensymphony.oscache.base.InitializationException;
011:
012: import org.apache.commons.logging.Log;
013: import org.apache.commons.logging.LogFactory;
014:
015: import org.jgroups.Address;
016: import org.jgroups.Channel;
017:
018: import org.jgroups.blocks.NotificationBus;
019:
020: import java.io.Serializable;
021:
022: /**
023: * <p>A concrete implementation of the {@link AbstractBroadcastingListener} based on
024: * the JavaGroups library. This Class uses JavaGroups to broadcast cache flush
025: * messages across a cluster.</p>
026: *
027: * <p>One of the following properties should be configured in <code>oscache.properties</code> for
028: * this listener:
029: * <ul>
030: * <li><b>cache.cluster.multicast.ip</b> - The multicast IP that JavaGroups should use for broadcasting</li>
031: * <li><b>cache.cluster.properties</b> - The JavaGroups channel properties to use. Allows for precise
032: * control over the behaviour of JavaGroups</li>
033: * </ul>
034: * Please refer to the clustering documentation for further details on the configuration of this listener.</p>
035: *
036: * @author <a href="mailto:chris@swebtec.com">Chris Miller</a>
037: */
038: public class JavaGroupsBroadcastingListener extends
039: AbstractBroadcastingListener implements
040: NotificationBus.Consumer {
041: private final static Log log = LogFactory
042: .getLog(JavaGroupsBroadcastingListener.class);
043: private static final String BUS_NAME = "OSCacheBus";
044: private static final String CHANNEL_PROPERTIES = "cache.cluster.properties";
045: private static final String MULTICAST_IP_PROPERTY = "cache.cluster.multicast.ip";
046:
047: /**
048: * The first half of the default channel properties. They default channel properties are:
049: * <pre>
050: * UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;\
051: * mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\
052: * PING(timeout=2000;num_initial_members=3):\
053: * MERGE2(min_interval=5000;max_interval=10000):\
054: * FD_SOCK:VERIFY_SUSPECT(timeout=1500):\
055: * pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\
056: * UNICAST(timeout=300,600,1200,2400):\
057: * pbcast.STABLE(desired_avg_gossip=20000):\
058: * FRAG(frag_size=8096;down_thread=false;up_thread=false):\
059: * pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
060: * </pre>
061: *
062: * Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
063: */
064: private static final String DEFAULT_CHANNEL_PROPERTIES_PRE = "UDP(mcast_addr=";
065:
066: /**
067: * The second half of the default channel properties. They default channel properties are:
068: * <pre>
069: * UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;\
070: * mcast_send_buf_size=150000;mcast_recv_buf_size=80000):\
071: * PING(timeout=2000;num_initial_members=3):\
072: * MERGE2(min_interval=5000;max_interval=10000):\
073: * FD_SOCK:VERIFY_SUSPECT(timeout=1500):\
074: * pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):\
075: * UNICAST(timeout=300,600,1200,2400):\
076: * pbcast.STABLE(desired_avg_gossip=20000):\
077: * FRAG(frag_size=8096;down_thread=false;up_thread=false):\
078: * pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
079: * </pre>
080: *
081: * Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
082: */
083: private static final String DEFAULT_CHANNEL_PROPERTIES_POST = ";mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):"
084: + "PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):"
085: + "pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800;max_xmit_size=8192):UNICAST(timeout=300,600,1200,2400):pbcast.STABLE(desired_avg_gossip=20000):"
086: + "FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)";
087: private static final String DEFAULT_MULTICAST_IP = "231.12.21.132";
088: private NotificationBus bus;
089:
090: /**
091: * Initializes the broadcasting listener by starting up a JavaGroups notification
092: * bus instance to handle incoming and outgoing messages.
093: *
094: * @param config An OSCache configuration object.
095: * @throws com.opensymphony.oscache.base.InitializationException If this listener has
096: * already been initialized.
097: */
098: public synchronized void initialize(Cache cache, Config config)
099: throws InitializationException {
100: super .initialize(cache, config);
101:
102: String properties = config.getProperty(CHANNEL_PROPERTIES);
103: String multicastIP = config.getProperty(MULTICAST_IP_PROPERTY);
104:
105: if ((properties == null) && (multicastIP == null)) {
106: multicastIP = DEFAULT_MULTICAST_IP;
107: }
108:
109: if (properties == null) {
110: properties = DEFAULT_CHANNEL_PROPERTIES_PRE
111: + multicastIP.trim()
112: + DEFAULT_CHANNEL_PROPERTIES_POST;
113: } else {
114: properties = properties.trim();
115: }
116:
117: if (log.isInfoEnabled()) {
118: log
119: .info("Starting a new JavaGroups broadcasting listener with properties="
120: + properties);
121: }
122:
123: try {
124: bus = new NotificationBus(BUS_NAME, properties);
125: bus.start();
126: bus.getChannel().setOpt(Channel.LOCAL, new Boolean(false));
127: bus.setConsumer(this );
128: log
129: .info("JavaGroups clustering support started successfully");
130: } catch (Exception e) {
131: throw new InitializationException("Initialization failed: "
132: + e);
133: }
134: }
135:
136: /**
137: * Shuts down the JavaGroups being managed by this listener. This
138: * occurs once the cache is shut down and this listener is no longer
139: * in use.
140: *
141: * @throws com.opensymphony.oscache.base.FinalizationException
142: */
143: public synchronized void finialize() throws FinalizationException {
144: if (log.isInfoEnabled()) {
145: log.info("JavaGroups shutting down...");
146: }
147:
148: // It's possible that the notification bus is null (CACHE-154)
149: if (bus != null) {
150: bus.stop();
151: bus = null;
152: } else {
153: log
154: .warn("Notification bus wasn't initialized or finialize was invoked before!");
155: }
156:
157: if (log.isInfoEnabled()) {
158: log.info("JavaGroups shutdown complete.");
159: }
160: }
161:
162: /**
163: * Uses JavaGroups to broadcast the supplied notification message across the cluster.
164: *
165: * @param message The cluster nofication message to broadcast.
166: */
167: protected void sendNotification(ClusterNotification message) {
168: bus.sendNotification(message);
169: }
170:
171: /**
172: * Handles incoming notification messages from JavaGroups. This method should
173: * never be called directly.
174: *
175: * @param serializable The incoming message object. This must be a {@link ClusterNotification}.
176: */
177: public void handleNotification(Serializable serializable) {
178: if (!(serializable instanceof ClusterNotification)) {
179: log
180: .error("An unknown cluster notification message received (class="
181: + serializable.getClass().getName()
182: + "). Notification ignored.");
183:
184: return;
185: }
186:
187: handleClusterNotification((ClusterNotification) serializable);
188: }
189:
190: /**
191: * We are not using the caching, so we just return something that identifies
192: * us. This method should never be called directly.
193: */
194: public Serializable getCache() {
195: return "JavaGroupsBroadcastingListener: "
196: + bus.getLocalAddress();
197: }
198:
199: /**
200: * A callback that is fired when a new member joins the cluster. This
201: * method should never be called directly.
202: *
203: * @param address The address of the member who just joined.
204: */
205: public void memberJoined(Address address) {
206: if (log.isInfoEnabled()) {
207: log.info("A new member at address '" + address
208: + "' has joined the cluster");
209: }
210: }
211:
212: /**
213: * A callback that is fired when an existing member leaves the cluster.
214: * This method should never be called directly.
215: *
216: * @param address The address of the member who left.
217: */
218: public void memberLeft(Address address) {
219: if (log.isInfoEnabled()) {
220: log.info("Member at address '" + address
221: + "' left the cluster");
222: }
223: }
224: }
|