001: // $Id: JChannelFactory.java,v 1.33.2.2 2006/12/08 00:41:17 vlada Exp $
002:
003: package org.jgroups;
004:
005: import org.apache.commons.logging.Log;
006: import org.apache.commons.logging.LogFactory;
007: import org.jgroups.conf.ConfiguratorFactory;
008: import org.jgroups.conf.ProtocolStackConfigurator;
009: import org.jgroups.conf.XmlConfigurator;
010: import org.jgroups.jmx.JmxConfigurator;
011: import org.jgroups.mux.Multiplexer;
012: import org.jgroups.mux.MuxChannel;
013: import org.jgroups.util.Util;
014: import org.w3c.dom.*;
015:
016: import javax.management.MBeanServer;
017: import javax.xml.parsers.DocumentBuilder;
018: import javax.xml.parsers.DocumentBuilderFactory;
019: import java.io.File;
020: import java.io.FileNotFoundException;
021: import java.io.IOException;
022: import java.io.InputStream;
023: import java.net.URL;
024: import java.util.HashMap;
025: import java.util.Iterator;
026: import java.util.Map;
027: import java.util.Set;
028:
029: /**
030: * JChannelFactory creates pure Java implementations of the <code>Channel</code>
031: * interface.
032: * See {@link JChannel} for a discussion of channel properties.
033: */
034: public class JChannelFactory implements ChannelFactory {
035: private ProtocolStackConfigurator configurator;
036:
037: private Log log = LogFactory.getLog(getClass());
038:
039: /** Map<String,String>. Hashmap which maps stack names to JGroups configurations. Keys are stack names, values are
040: * plain JGroups stack configs. This is (re-)populated whenever a setMultiplexerConfig() method is called */
041: private final Map stacks = new HashMap();
042:
043: /** Map<String,Entry>, maintains mapping between stack names (e.g. "udp") and Entries, which contain a JChannel and
044: * a Multiplexer */
045: private final Map channels = new HashMap();
046:
047: private String config = null;
048:
049: /** The MBeanServer to expose JMX management data with (no management data will be available if null) */
050: private MBeanServer server = null;
051:
052: /** To expose the channels and protocols */
053: private String domain = null;
054:
055: /** Whether or not to expose channels via JMX */
056: private boolean expose_channels = true;
057:
058: /** Whether to expose the factory only, or all protocols as well */
059: private boolean expose_protocols = true;
060:
061: // private Log log=LogFactory.getLog(getClass());
062: private final static String PROTOCOL_STACKS = "protocol_stacks";
063: private final static String STACK = "stack";
064: private static final String NAME = "name";
065: // private static final String DESCR="description";
066: private static final String CONFIG = "config";
067:
068: /**
069: * Constructs a <code>JChannelFactory</code> instance that contains no
070: * protocol stack configuration.
071: */
072: public JChannelFactory() {
073: }
074:
075: /**
076: * Constructs a <code>JChannelFactory</code> instance that utilizes the
077: * specified file for protocl stack configuration.
078: *
079: * @param properties a file containing a JGroups XML protocol stack
080: * configuration.
081: *
082: * @throws ChannelException if problems occur during the interpretation of
083: * the protocol stack configuration.
084: */
085: public JChannelFactory(File properties) throws ChannelException {
086: configurator = ConfiguratorFactory
087: .getStackConfigurator(properties);
088: }
089:
090: /**
091: * Constructs a <code>JChannelFactory</code> instance that utilizes the
092: * specified file for protocl stack configuration.
093: *
094: * @param properties a XML element containing a JGroups XML protocol stack
095: * configuration.
096: *
097: * @throws ChannelException if problems occur during the interpretation of
098: * the protocol stack configuration.
099: */
100: public JChannelFactory(Element properties) throws ChannelException {
101: configurator = ConfiguratorFactory
102: .getStackConfigurator(properties);
103: }
104:
105: /**
106: * Constructs a <code>JChannelFactory</code> instance that utilizes the
107: * specified file for protocl stack configuration.
108: *
109: * @param properties a URL pointing to a JGroups XML protocol stack
110: * configuration.
111: *
112: * @throws ChannelException if problems occur during the interpretation of
113: * the protocol stack configuration.
114: */
115: public JChannelFactory(URL properties) throws ChannelException {
116: configurator = ConfiguratorFactory
117: .getStackConfigurator(properties);
118: }
119:
120: /**
121: * Constructs a <code>JChannel</code> instance with the protocol stack
122: * configuration based upon the specified properties parameter.
123: *
124: * @param properties an old style property string, a string representing a
125: * system resource containing a JGroups XML configuration,
126: * a string representing a URL pointing to a JGroups XML
127: * XML configuration, or a string representing a file name
128: * that contains a JGroups XML configuration.
129: *
130: * @throws ChannelException if problems occur during the interpretation of
131: * the protocol stack configuration.
132: */
133: public JChannelFactory(String properties) throws ChannelException {
134: configurator = ConfiguratorFactory
135: .getStackConfigurator(properties);
136: }
137:
138: public void setMultiplexerConfig(Object properties)
139: throws Exception {
140: InputStream input = ConfiguratorFactory
141: .getConfigStream(properties);
142: if (input == null)
143: throw new FileNotFoundException(properties.toString());
144: try {
145: parse(input);
146: } catch (Exception ex) {
147: throw new Exception("failed parsing " + properties, ex);
148: } finally {
149: Util.close(input);
150: }
151: }
152:
153: public void setMultiplexerConfig(File file) throws Exception {
154: InputStream input = ConfiguratorFactory.getConfigStream(file);
155: if (input == null)
156: throw new FileNotFoundException(file.toString());
157: try {
158: parse(input);
159: } catch (Exception ex) {
160: throw new Exception("failed parsing " + file.toString(), ex);
161: } finally {
162: Util.close(input);
163: }
164: }
165:
166: public void setMultiplexerConfig(Element properties)
167: throws Exception {
168: parse(properties);
169: }
170:
171: public void setMultiplexerConfig(URL url) throws Exception {
172: InputStream input = ConfiguratorFactory.getConfigStream(url);
173: if (input == null)
174: throw new FileNotFoundException(url.toString());
175: try {
176: parse(input);
177: } catch (Exception ex) {
178: throw new Exception("failed parsing " + url.toString(), ex);
179: } finally {
180: Util.close(input);
181: }
182: }
183:
184: public String getMultiplexerConfig() {
185: return config;
186: }
187:
188: public void setMultiplexerConfig(String properties)
189: throws Exception {
190: InputStream input = ConfiguratorFactory
191: .getConfigStream(properties);
192: if (input == null)
193: throw new FileNotFoundException(properties);
194: try {
195: parse(input);
196: this .config = properties;
197: } catch (Exception ex) {
198: throw new Exception("failed parsing " + properties, ex);
199: } finally {
200: Util.close(input);
201: }
202: }
203:
204: public String getDomain() {
205: return domain;
206: }
207:
208: public void setDomain(String domain) {
209: this .domain = domain;
210: }
211:
212: public boolean isExposeChannels() {
213: return expose_channels;
214: }
215:
216: public void setExposeChannels(boolean expose_channels) {
217: this .expose_channels = expose_channels;
218: }
219:
220: public boolean isExposeProtocols() {
221: return expose_protocols;
222: }
223:
224: public void setExposeProtocols(boolean expose_protocols) {
225: this .expose_protocols = expose_protocols;
226: if (expose_protocols)
227: this .expose_channels = true;
228: }
229:
230: /**
231: * Creates a <code>JChannel</code> implementation of the
232: * <code>Channel</code> interface.
233: *
234: * @param properties the protocol stack configuration information; a
235: * <code>null</code> value means use the default protocol
236: * stack configuration.
237: *
238: * @throws ChannelException if the creation of the channel failed.
239: *
240: * @deprecated <code>JChannel</code>'s conversion to type-specific
241: * construction, and the subsequent deprecation of its
242: * <code>JChannel(Object)</code> constructor, necessitate the
243: * deprecation of this factory method as well. Type-specific
244: * protocol stack configuration should be specfied during
245: * construction of an instance of this factory.
246: */
247: public Channel createChannel(Object properties)
248: throws ChannelException {
249: return new JChannel(properties);
250: }
251:
252: /**
253: * Creates a <code>JChannel</code> implementation of the
254: * <code>Channel<code> interface using the protocol stack configuration
255: * information specfied during construction of an instance of this factory.
256: *
257: * @throws ChannelException if the creation of the channel failed.
258: */
259: public Channel createChannel() throws ChannelException {
260: return new JChannel(configurator);
261: }
262:
263: public Channel createMultiplexerChannel(String stack_name, String id)
264: throws Exception {
265: return createMultiplexerChannel(stack_name, id, false, null);
266: }
267:
268: public Channel createMultiplexerChannel(String stack_name,
269: String id, boolean register_for_state_transfer,
270: String substate_id) throws Exception {
271: if (stack_name == null || id == null)
272: throw new IllegalArgumentException(
273: "stack name and service ID have to be non null");
274: Entry entry;
275: synchronized (channels) {
276: entry = (Entry) channels.get(stack_name);
277: if (entry == null) {
278: entry = new Entry();
279: channels.put(stack_name, entry);
280: }
281: }
282: synchronized (entry) {
283: JChannel ch = entry.channel;
284: if (ch == null) {
285: String props = getConfig(stack_name);
286: ch = new JChannel(props);
287: entry.channel = ch;
288: if (expose_channels && server != null)
289: registerChannel(ch, stack_name);
290: }
291: Multiplexer mux = entry.multiplexer;
292: if (mux == null) {
293: mux = new Multiplexer(ch);
294: entry.multiplexer = mux;
295: }
296: if (register_for_state_transfer)
297: mux.registerForStateTransfer(id, substate_id);
298: return mux.createMuxChannel(this , id, stack_name);
299: }
300: }
301:
302: /**
303: * Returns true if this factory has already registered MuxChannel with
304: * given stack_name and an id, false otherwise.
305: *
306: * @param stack_name name of the stack used
307: * @param id service id
308: * @return true if such MuxChannel exists, false otherwise
309: */
310: public boolean hasMuxChannel(String stack_name, String id) {
311: Entry entry = null;
312: synchronized (channels) {
313: entry = (Entry) channels.get(stack_name);
314: }
315: if (entry != null) {
316: synchronized (entry) {
317: if (entry.multiplexer != null) {
318: Set services = entry.multiplexer.getServiceIds();
319: return (services != null && services.contains(id));
320: }
321: }
322: }
323: return false;
324: }
325:
326: private void registerChannel(JChannel ch, String stack_name)
327: throws Exception {
328: JmxConfigurator.registerChannel(ch, server, domain, stack_name,
329: expose_protocols);
330: }
331:
332: /** Unregisters everything under stack_name (including stack_name) */
333: private void unregister(String name) throws Exception {
334: JmxConfigurator.unregister(server, name);
335: }
336:
337: public void connect(MuxChannel ch) throws ChannelException {
338: Entry entry;
339: synchronized (channels) {
340: entry = (Entry) channels.get(ch.getStackName());
341: }
342: if (entry != null) {
343: synchronized (entry) {
344: if (entry.channel == null)
345: throw new ChannelException(
346: "channel has to be created before it can be connected");
347:
348: if (entry.multiplexer != null)
349: entry.multiplexer.addServiceIfNotPresent(
350: ch.getId(), ch);
351:
352: if (!entry.channel.isConnected()) {
353: entry.channel.connect(ch.getStackName());
354: if (entry.multiplexer != null) {
355: try {
356: entry.multiplexer.fetchServiceInformation();
357: } catch (Exception e) {
358: if (log.isErrorEnabled())
359: log
360: .error(
361: "failed fetching service state",
362: e);
363: }
364: }
365: }
366: if (entry.multiplexer != null) {
367: try {
368: Address addr = entry.channel.getLocalAddress();
369: if (entry.channel.flushSupported()) {
370: boolean successfulFlush = entry.channel
371: .startFlush(3000, false);
372: if (!successfulFlush && log.isWarnEnabled()) {
373: log.warn("Flush failed at "
374: + ch.getLocalAddress()
375: + ch.getId());
376: }
377: }
378: entry.multiplexer.sendServiceUpMessage(ch
379: .getId(), addr, true);
380: } catch (Exception e) {
381: if (log.isErrorEnabled())
382: log
383: .error(
384: "failed sending SERVICE_UP message",
385: e);
386: } finally {
387: if (entry.channel.flushSupported())
388: entry.channel.stopFlush();
389: }
390: }
391: }
392: }
393: ch.setClosed(false);
394: ch.setConnected(true);
395: }
396:
397: public void disconnect(MuxChannel ch) {
398: Entry entry;
399:
400: synchronized (channels) {
401: entry = (Entry) channels.get(ch.getStackName());
402: }
403: if (entry != null) {
404: synchronized (entry) {
405: Multiplexer mux = entry.multiplexer;
406: if (mux != null) {
407: Address addr = entry.channel.getLocalAddress();
408: try {
409: if (entry.channel.flushSupported()) {
410: boolean successfulFlush = entry.channel
411: .startFlush(3000, false);
412: if (!successfulFlush && log.isWarnEnabled()) {
413: log.warn("Flush failed at "
414: + ch.getLocalAddress()
415: + ch.getId());
416: }
417: }
418: mux.sendServiceDownMessage(ch.getId(), addr,
419: true);
420: } catch (Exception e) {
421: if (log.isErrorEnabled())
422: log
423: .error(
424: "failed sending SERVICE_DOWN message",
425: e);
426: } finally {
427: if (entry.channel.flushSupported())
428: entry.channel.stopFlush();
429: }
430: mux.disconnect(); // disconnects JChannel if all MuxChannels are in disconnected state
431: }
432: }
433: }
434: }
435:
436: public void close(MuxChannel ch) {
437: Entry entry;
438: String stack_name = ch.getStackName();
439: boolean all_closed = false;
440:
441: synchronized (channels) {
442: entry = (Entry) channels.get(stack_name);
443: }
444: if (entry != null) {
445: synchronized (entry) {
446: Multiplexer mux = entry.multiplexer;
447: if (mux != null) {
448: Address addr = entry.channel.getLocalAddress();
449: if (addr != null) {
450: try {
451: if (entry.channel.flushSupported()) {
452: boolean successfulFlush = entry.channel
453: .startFlush(3000, false);
454: if (!successfulFlush
455: && log.isWarnEnabled()) {
456: log.warn("Flush failed at "
457: + ch.getLocalAddress()
458: + ch.getId());
459: }
460: }
461: mux.sendServiceDownMessage(ch.getId(),
462: addr, true);
463: } catch (Exception e) {
464: if (log.isErrorEnabled())
465: log
466: .error(
467: "failed sending SERVICE_DOWN message",
468: e);
469: } finally {
470: if (entry.channel.flushSupported())
471: entry.channel.stopFlush();
472: }
473: }
474: all_closed = mux.close(); // closes JChannel if all MuxChannels are in closed state
475: }
476: }
477: if (all_closed) {
478: channels.remove(stack_name);
479: }
480: if (expose_channels && server != null) {
481: try {
482: unregister(domain + ":*,cluster=" + stack_name);
483: } catch (Exception e) {
484: log.error("failed unregistering channel "
485: + stack_name, e);
486: }
487: }
488: }
489: }
490:
491: public void shutdown(MuxChannel ch) {
492: Entry entry;
493: String stack_name = ch.getStackName();
494: boolean all_closed = false;
495:
496: synchronized (channels) {
497: entry = (Entry) channels.get(stack_name);
498: if (entry != null) {
499: synchronized (entry) {
500: Multiplexer mux = entry.multiplexer;
501: if (mux != null) {
502: Address addr = entry.channel.getLocalAddress();
503: try {
504: if (entry.channel.flushSupported()) {
505: boolean successfulFlush = entry.channel
506: .startFlush(3000, false);
507: if (!successfulFlush
508: && log.isWarnEnabled()) {
509: log.warn("Flush failed at "
510: + ch.getLocalAddress()
511: + ch.getId());
512: }
513: }
514: mux.sendServiceDownMessage(ch.getId(),
515: addr, true);
516: } catch (Exception e) {
517: if (log.isErrorEnabled())
518: log
519: .error(
520: "failed sending SERVICE_DOWN message",
521: e);
522: } finally {
523: if (entry.channel.flushSupported())
524: entry.channel.stopFlush();
525: }
526: all_closed = mux.shutdown(); // closes JChannel if all MuxChannels are in closed state
527:
528: //mux.unregister(ch.getId());
529: }
530: }
531: if (all_closed) {
532: channels.remove(stack_name);
533: }
534: if (expose_channels && server != null) {
535: try {
536: unregister(domain + ":*,cluster=" + stack_name);
537: } catch (Exception e) {
538: log.error("failed unregistering channel "
539: + stack_name, e);
540: }
541: }
542: }
543: }
544: }
545:
546: public void open(MuxChannel ch) throws ChannelException {
547: Entry entry;
548: synchronized (channels) {
549: entry = (Entry) channels.get(ch.getStackName());
550: }
551: if (entry != null) {
552: synchronized (entry) {
553: if (entry.channel == null)
554: throw new ChannelException(
555: "channel has to be created before it can be opened");
556: if (!entry.channel.isOpen())
557: entry.channel.open();
558: }
559: }
560: ch.setClosed(false);
561: ch.setConnected(false); // needs to be connected next
562: }
563:
564: public void create() throws Exception {
565: if (expose_channels) {
566: server = Util.getMBeanServer();
567: if (server == null)
568: throw new Exception(
569: "No MBeanServer found; JChannelFactory needs to be run with an MBeanServer present, "
570: + "e.g. inside JBoss or JDK 5, or with ExposeChannel set to false");
571: if (domain == null)
572: domain = "jgroups:name=Multiplexer";
573: }
574: }
575:
576: public void start() throws Exception {
577:
578: }
579:
580: public void stop() {
581:
582: }
583:
584: public void destroy() {
585: synchronized (channels) {
586: Entry entry;
587: Map.Entry tmp;
588: for (Iterator it = channels.entrySet().iterator(); it
589: .hasNext();) {
590: tmp = (Map.Entry) it.next();
591: entry = (Entry) tmp.getValue();
592: if (entry.multiplexer != null)
593: entry.multiplexer.closeAll();
594: if (entry.channel != null)
595: entry.channel.close();
596:
597: }
598: if (expose_channels && server != null) {
599: try {
600: unregister(domain + ":*");
601: } catch (Throwable e) {
602: log.error("failed unregistering domain " + domain,
603: e);
604: }
605: }
606: channels.clear();
607: }
608: }
609:
610: public String dumpConfiguration() {
611: if (stacks != null) {
612: return stacks.keySet().toString();
613: } else
614: return null;
615: }
616:
617: public String dumpChannels() {
618: if (channels == null)
619: return null;
620: StringBuffer sb = new StringBuffer();
621: for (Iterator it = channels.entrySet().iterator(); it.hasNext();) {
622: Map.Entry entry = (Map.Entry) it.next();
623: sb.append(entry.getKey()).append(": ").append(
624: ((Entry) entry.getValue()).multiplexer
625: .getServiceIds()).append("\n");
626: }
627: return sb.toString();
628: }
629:
630: private void parse(InputStream input) throws Exception {
631: /**
632: * CAUTION: crappy code ahead ! I (bela) am not an XML expert, so the code below is pretty amateurish...
633: * But it seems to work, and it is executed only on startup, so no perf loss on the critical path.
634: * If somebody wants to improve this, please be my guest.
635: */
636: DocumentBuilderFactory factory = DocumentBuilderFactory
637: .newInstance();
638: factory.setValidating(false); //for now
639: DocumentBuilder builder = factory.newDocumentBuilder();
640: Document document = builder.parse(input);
641:
642: // The root element of the document should be the "config" element,
643: // but the parser(Element) method checks this so a check is not
644: // needed here.
645: Element configElement = document.getDocumentElement();
646: parse(configElement);
647: }
648:
649: private void parse(Element root) throws Exception {
650: /**
651: * CAUTION: crappy code ahead ! I (bela) am not an XML expert, so the code below is pretty amateurish...
652: * But it seems to work, and it is executed only on startup, so no perf loss on the critical path.
653: * If somebody wants to improve this, please be my guest.
654: */
655: String root_name = root.getNodeName();
656: if (!PROTOCOL_STACKS.equals(root_name.trim().toLowerCase())) {
657: String error = "XML protocol stack configuration does not start with a '<config>' element; "
658: + "maybe the XML configuration needs to be converted to the new format ?\n"
659: + "use 'java org.jgroups.conf.XmlConfigurator <old XML file> -new_format' to do so";
660: throw new IOException("invalid XML configuration: " + error);
661: }
662:
663: NodeList tmp_stacks = root.getChildNodes();
664: for (int i = 0; i < tmp_stacks.getLength(); i++) {
665: Node node = tmp_stacks.item(i);
666: if (node.getNodeType() != Node.ELEMENT_NODE)
667: continue;
668:
669: Element stack = (Element) node;
670: String tmp = stack.getNodeName();
671: if (!STACK.equals(tmp.trim().toLowerCase())) {
672: throw new IOException(
673: "invalid configuration: didn't find a \""
674: + STACK + "\" element under \""
675: + PROTOCOL_STACKS + "\"");
676: }
677:
678: NamedNodeMap attrs = stack.getAttributes();
679: Node name = attrs.getNamedItem(NAME);
680: // Node descr=attrs.getNamedItem(DESCR);
681: String st_name = name.getNodeValue();
682: // String stack_descr=descr.getNodeValue();
683: // System.out.print("Parsing \"" + st_name + "\" (" + stack_descr + ")");
684: NodeList configs = stack.getChildNodes();
685: for (int j = 0; j < configs.getLength(); j++) {
686: Node tmp_config = configs.item(j);
687: if (tmp_config.getNodeType() != Node.ELEMENT_NODE)
688: continue;
689: Element cfg = (Element) tmp_config;
690: tmp = cfg.getNodeName();
691: if (!CONFIG.equals(tmp))
692: throw new IOException(
693: "invalid configuration: didn't find a \""
694: + CONFIG + "\" element under \""
695: + STACK + "\"");
696:
697: XmlConfigurator conf = XmlConfigurator.getInstance(cfg);
698: // fixes http://jira.jboss.com/jira/browse/JGRP-290
699: ConfiguratorFactory.substituteVariables(conf); // replace vars with system props
700: String val = conf.getProtocolStackString();
701: this .stacks.put(st_name, val);
702: }
703: //System.out.println(" - OK");
704: }
705: // System.out.println("stacks: ");
706: // for(Iterator it=stacks.entrySet().iterator(); it.hasNext();) {
707: // Map.Entry entry=(Map.Entry)it.next();
708: // System.out.println("key: " + entry.getKey());
709: // System.out.println("val: " + entry.getValue() + "\n");
710: // }
711: }
712:
713: /**
714: * Returns the stack configuration as a string (to be fed into new JChannel()). Throws an exception
715: * if the stack_name is not found. One of the setMultiplexerConfig() methods had to be called beforehand
716: * @return The protocol stack config as a plain string
717: */
718: private String getConfig(String stack_name) throws Exception {
719: String cfg = (String) stacks.get(stack_name);
720: if (cfg == null)
721: throw new Exception("stack \"" + stack_name
722: + "\" not found in " + stacks.keySet());
723: return cfg;
724: }
725:
726: private static class Entry {
727: JChannel channel;
728: Multiplexer multiplexer;
729: }
730:
731: }
|