001: /*
002: * JBoss, Home of Professional Open Source.
003: * Copyright 2006, Red Hat Middleware LLC, and individual contributors
004: * as indicated by the @author tags. See the copyright.txt file in the
005: * distribution for a full listing of individual contributors.
006: *
007: * This is free software; you can redistribute it and/or modify it
008: * under the terms of the GNU Lesser General Public License as
009: * published by the Free Software Foundation; either version 2.1 of
010: * the License, or (at your option) any later version.
011: *
012: * This software is distributed in the hope that it will be useful,
013: * but WITHOUT ANY WARRANTY; without even the implied warranty of
014: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015: * Lesser General Public License for more details.
016: *
017: * You should have received a copy of the GNU Lesser General Public
018: * License along with this software; if not, write to the Free
019: * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020: * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021: */
022: package org.jboss.ha.framework.server;
023:
024: import java.io.Serializable;
025: import java.util.ArrayList;
026: import java.util.Collection;
027: import java.util.Collections;
028: import java.util.HashMap;
029: import java.util.Iterator;
030: import java.util.Map;
031: import javax.management.MBeanServer;
032: import javax.management.ObjectName;
033:
034: import org.jboss.ha.framework.interfaces.HAPartition;
035: import org.jboss.ha.framework.interfaces.HAPartition.HAPartitionStateTransfer;
036: import org.jboss.logging.Logger;
037: import org.jboss.system.Registry;
038:
039: /**
040: * This class manages distributed state across the cluster.
041: *
042: * @author <a href="mailto:sacha.labourey@cogito-info.ch">Sacha Labourey</a>.
043: * @author <a href="mailto:bill@burkecentral.com">Bill Burke</a>.
044: * @author Scott.Stark@jboss.org
045: * @version $Revision: 57188 $
046: */
047: public class DistributedStateImpl implements DistributedStateImplMBean,
048: HAPartitionStateTransfer {
049: // Constants -----------------------------------------------------
050:
051: protected final static String SERVICE_NAME = "DistributedState";
052:
053: protected final static Class[] set_types = new Class[] {
054: String.class, Serializable.class, Serializable.class };
055: protected final static Class[] remove_types = new Class[] {
056: String.class, Serializable.class };
057:
058: // Attributes ----------------------------------------------------
059:
060: /**
061: * HashMap<String, HashMap>. Keys= category, value = HashMap<Object, Object>
062: */
063: protected HashMap categories = new HashMap();
064:
065: protected HashMap keyListeners = new HashMap();
066: protected HAPartition partition;
067: protected Logger log;
068: protected MBeanServer mbeanServer = null;
069: protected String name = null;
070:
071: // Static --------------------------------------------------------c
072:
073: // Constructors --------------------------------------------------
074:
075: public DistributedStateImpl() {
076: } // for JMX checks
077:
078: public DistributedStateImpl(HAPartition partition,
079: MBeanServer server) {
080: this .partition = partition;
081: this .mbeanServer = server;
082: this .log = Logger.getLogger(this .getClass());
083: }
084:
085: // Public --------------------------------------------------------
086:
087: public void init() throws Exception {
088: // When we subscribe to state transfer events, GetState will be called to initialize
089: // this service.
090: partition.subscribeToStateTransferEvents(SERVICE_NAME, this );
091: partition.registerRPCHandler(SERVICE_NAME, this );
092:
093: // subscribed this "sub-service" of HAPartition with JMX
094: // TODO: In the future (when state transfer issues will be completed),
095: // we will need to redesign the way HAPartitions and its sub-protocols are
096: // registered with JMX. They will most probably be independant JMX services.
097: //
098: this .name = "jboss:service=" + SERVICE_NAME + ",partitionName="
099: + this .partition.getPartitionName();
100: ObjectName jmxName = new ObjectName(this .name);
101: mbeanServer.registerMBean(this , jmxName);
102: Registry.bind(this .name, this );
103: }
104:
105: public void start() throws Exception {
106: }
107:
108: public void stop() throws Exception {
109: // NR 200505:[JBCLUSTER-38] will be done at destroy instead
110: // Registry.unbind (this.name);
111: // ObjectName jmxName = new ObjectName(this.name);
112: // mbeanServer.unregisterMBean (jmxName);
113: }
114:
115: // NR 200505 : [JBCLUSTER-38] unbind DS MBean
116: public void destroy() throws Exception {
117: Registry.unbind(this .name);
118: ObjectName jmxName = new ObjectName(this .name);
119: mbeanServer.unregisterMBean(jmxName);
120:
121: partition
122: .unsubscribeFromStateTransferEvents(SERVICE_NAME, this );
123: partition.unregisterRPCHandler(SERVICE_NAME, this );
124: }
125:
126: public String listContent() throws Exception {
127: StringBuffer result = new StringBuffer();
128: Collection cats = this .getAllCategories();
129: Iterator catsIter = cats.iterator();
130: while (catsIter.hasNext()) {
131: String category = (String) catsIter.next();
132: Iterator keysIter = this .getAllKeys(category).iterator();
133:
134: result
135: .append("-----------------------------------------------\n");
136: result.append("Category : ").append(category)
137: .append("\n\n");
138: result.append("KEY\t:\tVALUE\n");
139:
140: while (keysIter.hasNext()) {
141: Serializable key = (Serializable) keysIter.next();
142: String value = this .get(category, key).toString();
143: result.append("'").append(key);
144: result.append("'\t:\t'");
145: result.append(value);
146: result.append("'\n");
147: }
148: result.append("\n");
149: }
150: return result.toString();
151: }
152:
153: public String listXmlContent() throws Exception {
154: StringBuffer result = new StringBuffer();
155: Collection cats = this .getAllCategories();
156: Iterator catsIter = cats.iterator();
157:
158: result.append("<DistributedState>\n");
159:
160: while (catsIter.hasNext()) {
161: String category = (String) catsIter.next();
162: Iterator keysIter = this .getAllKeys(category).iterator();
163:
164: result.append("\t<Category>\n");
165: result.append("\t\t<CategoryName>").append(category)
166: .append("</CategoryName>\n");
167:
168: while (keysIter.hasNext()) {
169: Serializable key = (Serializable) keysIter.next();
170: String value = this .get(category, key).toString();
171: result.append("\t\t<Entry>\n");
172: result.append("\t\t\t<Key>").append(key).append(
173: "</Key>\n");
174: result.append("\t\t\t<Value>").append(value).append(
175: "</Value>\n");
176: result.append("\t\t</Entry>\n");
177: }
178: result.append("\t</Category>\n");
179: }
180: result.append("</DistributedState>\n");
181:
182: return result.toString();
183: }
184:
185: // DistributedState implementation ----------------------------------------------
186:
187: public void set(String category, Serializable key,
188: Serializable value) throws Exception {
189: set(category, key, value, true);
190: }
191:
192: public void set(String category, Serializable key,
193: Serializable value, boolean asynchronousCall)
194: throws Exception {
195: Object[] args = { category, key, value };
196: if (asynchronousCall)
197: partition.callAsynchMethodOnCluster(SERVICE_NAME, "_set",
198: args, set_types, true);
199: else
200: partition.callMethodOnCluster(SERVICE_NAME, "_set", args,
201: set_types, true);
202: this ._setInternal(category, key, value);
203: notifyKeyListeners(category, key, value, true);
204: }
205:
206: public Serializable remove(String category, Serializable key)
207: throws Exception {
208: return remove(category, key, true);
209: }
210:
211: public Serializable remove(String category, Serializable key,
212: boolean asynchronousCall) throws Exception {
213: Object[] args = { category, key };
214: if (asynchronousCall)
215: partition.callAsynchMethodOnCluster(SERVICE_NAME,
216: "_remove", args, remove_types, true);
217: else
218: partition.callMethodOnCluster(SERVICE_NAME, "_remove",
219: args, remove_types, true);
220: Serializable removed = this ._removeInternal(category, key);
221: notifyKeyListenersOfRemove(category, key, removed, true);
222: return removed;
223: }
224:
225: public Serializable get(String category, Serializable key) {
226: synchronized (this .categories) {
227: HashMap cat = (HashMap) categories.get(category);
228: if (cat == null)
229: return null;
230:
231: return (Serializable) cat.get(key);
232: }
233: }
234:
235: public Collection getAllCategories() {
236: synchronized (this .categories) {
237: return Collections.unmodifiableCollection(categories
238: .keySet());
239: }
240: }
241:
242: public Collection getAllKeys(String category) {
243: synchronized (this .categories) {
244: HashMap cat = (HashMap) categories.get(category);
245: if (cat == null)
246: return null;
247:
248: return Collections.unmodifiableCollection(cat.keySet());
249: }
250: }
251:
252: public Collection getAllValues(String category) {
253: synchronized (this .categories) {
254: HashMap cat = (HashMap) categories.get(category);
255: if (cat == null)
256: return null;
257:
258: return Collections.unmodifiableCollection(cat.values());
259: }
260: }
261:
262: public void registerDSListenerEx(String category,
263: DSListenerEx subscriber) {
264: registerListener(category, subscriber);
265: }
266:
267: public void unregisterDSListenerEx(String category,
268: DSListenerEx subscriber) {
269: unregisterListener(category, subscriber);
270: }
271:
272: public void registerDSListener(String category,
273: DSListener subscriber) {
274: registerListener(category, subscriber);
275: }
276:
277: public void unregisterDSListener(String category,
278: DSListener subscriber) {
279: unregisterListener(category, subscriber);
280: }
281:
282: // HAPartition RPC method invocations implementation ----------------------------------------------
283:
284: public void _set(String category, String key, Serializable value)
285: throws Exception {
286: this ._setInternal(category, key, value);
287: notifyKeyListeners(category, key, value, false);
288: }
289:
290: public void _set(String category, Serializable key,
291: Serializable value) throws Exception {
292: this ._setInternal(category, key, value);
293: notifyKeyListeners(category, key, value, false);
294: }
295:
296: public void _setInternal(String category, Serializable key,
297: Serializable value) throws Exception {
298: synchronized (this .categories) {
299: HashMap cat = (HashMap) categories.get(category);
300: if (cat == null) {
301: cat = new HashMap();
302: categories.put(category, cat);
303: }
304: cat.put(key, value);
305: }
306: }
307:
308: public void _remove(String category, String key) throws Exception {
309: Serializable removed = this ._removeInternal(category, key);
310: notifyKeyListenersOfRemove(category, key, removed, false);
311: }
312:
313: public void _remove(String category, Serializable key)
314: throws Exception {
315: Serializable removed = this ._removeInternal(category, key);
316: notifyKeyListenersOfRemove(category, key, removed, false);
317: }
318:
319: public Serializable _removeInternal(String category,
320: Serializable key) throws Exception {
321: synchronized (this .categories) {
322: HashMap cat = (HashMap) categories.get(category);
323: if (cat == null)
324: return null;
325: Object removed = cat.remove(key);
326: if (removed != null) {
327: if (cat.size() == 0) {
328: categories.remove(category);
329: }
330: }
331: return (Serializable) removed;
332: }
333: }
334:
335: // HAPartitionStateTransfer implementation ----------------------------------------------
336:
337: public Serializable getCurrentState() {
338: HashMap retval = new HashMap();
339: Map.Entry entry;
340: String catName;
341: HashMap value, newVal;
342:
343: synchronized (this .categories) {
344:
345: for (Iterator it = this .categories.entrySet().iterator(); it
346: .hasNext();) {
347: entry = (Map.Entry) it.next(); // key: category name, value: HashMap
348: catName = (String) entry.getKey();
349: value = (HashMap) entry.getValue();
350: newVal = value != null ? (HashMap) value.clone() : null;
351: retval.put(catName, newVal);
352: }
353:
354: return retval;
355: }
356: }
357:
358: public void setCurrentState(Serializable newState) {
359: synchronized (this .categories) {
360: categories.clear();
361: categories.putAll((HashMap) newState);
362: if (keyListeners.size() > 0) {
363: cleanupKeyListeners();
364: }
365: }
366: }
367:
368: // Package protected ---------------------------------------------
369:
370: // Protected -----------------------------------------------------
371:
372: protected void registerListener(String category, Object subscriber) {
373: synchronized (this .keyListeners) {
374: ArrayList listeners = (ArrayList) keyListeners
375: .get(category);
376: if (listeners == null) {
377: listeners = new ArrayList();
378: keyListeners.put(category, listeners);
379: }
380: listeners.add(subscriber);
381: }
382: }
383:
384: protected void unregisterListener(String category, Object subscriber) {
385: synchronized (this .keyListeners) {
386: ArrayList listeners = (ArrayList) keyListeners
387: .get(category);
388: if (listeners == null)
389: return;
390:
391: listeners.remove(subscriber);
392: if (listeners.size() == 0) {
393: keyListeners.remove(category);
394: }
395: }
396: }
397:
398: protected void notifyKeyListeners(String category,
399: Serializable key, Serializable value,
400: boolean locallyModified) {
401: synchronized (this .keyListeners) {
402: ArrayList listeners = (ArrayList) keyListeners
403: .get(category);
404: if (listeners == null)
405: return;
406: String strKey = key.toString();
407:
408: for (int i = 0; i < listeners.size(); i++) {
409: Object listener = listeners.get(i);
410: if (listener instanceof DSListener) {
411: DSListener dslistener = (DSListener) listener;
412: dslistener.valueHasChanged(category, strKey, value,
413: locallyModified);
414: } else {
415: DSListenerEx dslistener = (DSListenerEx) listener;
416: dslistener.valueHasChanged(category, key, value,
417: locallyModified);
418: }
419: }
420: }
421: }
422:
423: protected void notifyKeyListenersOfRemove(String category,
424: Serializable key, Serializable oldContent,
425: boolean locallyModified) {
426: synchronized (this .keyListeners) {
427: ArrayList listeners = (ArrayList) keyListeners
428: .get(category);
429: if (listeners == null)
430: return;
431: String strKey = key.toString();
432:
433: for (int i = 0; i < listeners.size(); i++) {
434: Object listener = listeners.get(i);
435: if (listener instanceof DSListener) {
436: DSListener dslistener = (DSListener) listener;
437: dslistener.keyHasBeenRemoved(category, strKey,
438: oldContent, locallyModified);
439: } else {
440: DSListenerEx dslistener = (DSListenerEx) listener;
441: dslistener.keyHasBeenRemoved(category, key,
442: oldContent, locallyModified);
443: }
444: }
445: }
446: }
447:
448: protected void cleanupKeyListeners() {
449: // NOT IMPLEMENTED YET
450: }
451:
452: // Private -------------------------------------------------------
453:
454: // Inner classes -------------------------------------------------
455:
456: }
|